Skip to main content

datum/stream/
time.rs

1use super::*;
2use crate::stream::runtime::current_stream_cancelled;
3use std::collections::VecDeque;
4use std::mem;
5use std::panic::{AssertUnwindSafe, catch_unwind};
6use std::time::Instant;
7
8const DELAY_BUFFER_CAPACITY: usize = 16;
9const WAIT_POLL_INTERVAL: Duration = Duration::from_millis(10);
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum ThrottleMode {
13    Shaping,
14    Enforcing,
15}
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum DelayOverflowStrategy {
19    EmitEarly,
20    DropHead,
21    DropTail,
22    DropBuffer,
23    DropNew,
24    Backpressure,
25    Fail,
26}
27
28#[derive(Clone)]
29enum TerminalSignal {
30    Complete,
31    Error(StreamError),
32}
33
34struct DelayQueueShared<T> {
35    state: Mutex<DelayQueueState<T>>,
36    available: Condvar,
37    cancelled: Arc<AtomicBool>,
38}
39
40struct DelayQueueState<T> {
41    queue: VecDeque<(Instant, T)>,
42    terminal: Option<TerminalSignal>,
43}
44
45impl<T> DelayQueueShared<T> {
46    fn new() -> Arc<Self> {
47        Arc::new(Self {
48            state: Mutex::new(DelayQueueState {
49                queue: VecDeque::new(),
50                terminal: None,
51            }),
52            available: Condvar::new(),
53            cancelled: Arc::new(AtomicBool::new(false)),
54        })
55    }
56}
57
58struct DelayQueueStream<T> {
59    shared: Arc<DelayQueueShared<T>>,
60    completion: Option<StreamCompletion<NotUsed>>,
61}
62
63impl<T> Iterator for DelayQueueStream<T> {
64    type Item = StreamResult<T>;
65
66    fn next(&mut self) -> Option<Self::Item> {
67        let mut state = self
68            .shared
69            .state
70            .lock()
71            .unwrap_or_else(|poison| poison.into_inner());
72        loop {
73            if let Some((deadline, _)) = state.queue.front() {
74                let now = Instant::now();
75                if *deadline <= now {
76                    let (_, item) = state.queue.pop_front().expect("front element present");
77                    drop(state);
78                    self.shared.available.notify_all();
79                    return Some(Ok(item));
80                }
81                let wait = (*deadline - now).min(WAIT_POLL_INTERVAL);
82                let (next, _) = self
83                    .shared
84                    .available
85                    .wait_timeout(state, wait)
86                    .unwrap_or_else(|poison| poison.into_inner());
87                state = next;
88                continue;
89            }
90
91            if let Some(terminal) = state.terminal.clone() {
92                return match terminal {
93                    TerminalSignal::Complete => None,
94                    TerminalSignal::Error(error) => Some(Err(error)),
95                };
96            }
97            if self.shared.cancelled.load(Ordering::SeqCst) {
98                return Some(Err(StreamError::Cancelled));
99            }
100
101            let (next, _) = self
102                .shared
103                .available
104                .wait_timeout(state, WAIT_POLL_INTERVAL)
105                .unwrap_or_else(|poison| poison.into_inner());
106            state = next;
107        }
108    }
109}
110
111impl<T> Drop for DelayQueueStream<T> {
112    fn drop(&mut self) {
113        self.shared.cancelled.store(true, Ordering::SeqCst);
114        self.shared.available.notify_all();
115        let _ = self.completion.take();
116    }
117}
118
119struct SlotShared<T, Extra> {
120    state: Mutex<SlotState<T, Extra>>,
121    available: Condvar,
122    cancelled: Arc<AtomicBool>,
123}
124
125struct SlotState<T, Extra> {
126    slot: Option<T>,
127    terminal: Option<TerminalSignal>,
128    extra: Extra,
129}
130
131impl<T, Extra> SlotShared<T, Extra> {
132    fn new(extra: Extra) -> Arc<Self> {
133        Arc::new(Self {
134            state: Mutex::new(SlotState {
135                slot: None,
136                terminal: None,
137                extra,
138            }),
139            available: Condvar::new(),
140            cancelled: Arc::new(AtomicBool::new(false)),
141        })
142    }
143}
144
145struct SlotStream<T, Extra> {
146    shared: Arc<SlotShared<T, Extra>>,
147    completion: Option<StreamCompletion<NotUsed>>,
148}
149
150impl<T, Extra> Iterator for SlotStream<T, Extra> {
151    type Item = StreamResult<T>;
152
153    fn next(&mut self) -> Option<Self::Item> {
154        let mut state = self
155            .shared
156            .state
157            .lock()
158            .unwrap_or_else(|poison| poison.into_inner());
159        loop {
160            if let Some(item) = state.slot.take() {
161                drop(state);
162                self.shared.available.notify_all();
163                return Some(Ok(item));
164            }
165            if let Some(terminal) = state.terminal.clone() {
166                return match terminal {
167                    TerminalSignal::Complete => None,
168                    TerminalSignal::Error(error) => Some(Err(error)),
169                };
170            }
171            if self.shared.cancelled.load(Ordering::SeqCst) {
172                return Some(Err(StreamError::Cancelled));
173            }
174            let (next, _) = self
175                .shared
176                .available
177                .wait_timeout(state, WAIT_POLL_INTERVAL)
178                .unwrap_or_else(|poison| poison.into_inner());
179            state = next;
180        }
181    }
182}
183
184impl<T, Extra> Drop for SlotStream<T, Extra> {
185    fn drop(&mut self) {
186        self.shared.cancelled.store(true, Ordering::SeqCst);
187        self.shared.available.notify_all();
188        let _ = self.completion.take();
189    }
190}
191
192fn finish_delay_queue<T>(shared: &DelayQueueShared<T>, terminal: TerminalSignal) {
193    let mut state = shared
194        .state
195        .lock()
196        .unwrap_or_else(|poison| poison.into_inner());
197    if state.terminal.is_none() {
198        state.terminal = Some(terminal);
199    }
200    drop(state);
201    shared.available.notify_all();
202}
203
204fn finish_slot<T, Extra>(shared: &SlotShared<T, Extra>, terminal: TerminalSignal) {
205    let mut state = shared
206        .state
207        .lock()
208        .unwrap_or_else(|poison| poison.into_inner());
209    if state.terminal.is_none() {
210        state.terminal = Some(terminal);
211    }
212    drop(state);
213    shared.available.notify_all();
214}
215
216struct QueuePanicGuard<T> {
217    shared: Arc<DelayQueueShared<T>>,
218    armed: bool,
219}
220
221impl<T> QueuePanicGuard<T> {
222    fn new(shared: Arc<DelayQueueShared<T>>) -> Self {
223        Self {
224            shared,
225            armed: true,
226        }
227    }
228
229    fn disarm(&mut self) {
230        self.armed = false;
231    }
232}
233
234impl<T> Drop for QueuePanicGuard<T> {
235    fn drop(&mut self) {
236        if self.armed {
237            finish_delay_queue(
238                &self.shared,
239                TerminalSignal::Error(StreamError::AbruptTermination),
240            );
241        }
242    }
243}
244
245struct SlotPanicGuard<T, Extra> {
246    shared: Arc<SlotShared<T, Extra>>,
247    armed: bool,
248}
249
250impl<T, Extra> SlotPanicGuard<T, Extra> {
251    fn new(shared: Arc<SlotShared<T, Extra>>) -> Self {
252        Self {
253            shared,
254            armed: true,
255        }
256    }
257
258    fn disarm(&mut self) {
259        self.armed = false;
260    }
261}
262
263impl<T, Extra> Drop for SlotPanicGuard<T, Extra> {
264    fn drop(&mut self) {
265        if self.armed {
266            finish_slot(
267                &self.shared,
268                TerminalSignal::Error(StreamError::AbruptTermination),
269            );
270        }
271    }
272}
273
274fn cloned_materializer(materializer: &Materializer) -> Materializer {
275    materializer.with_name_prefix(Arc::from(materializer.name_prefix()))
276}
277
278fn wait_for_timer(materializer: &Materializer, delay: Duration) -> StreamResult<()> {
279    if delay.is_zero() {
280        return Ok(());
281    }
282    let gate = Arc::new((Mutex::new(false), Condvar::new()));
283    let gate_task = Arc::clone(&gate);
284    let _timer = materializer.schedule_once(delay, move || {
285        let (lock, condvar) = &*gate_task;
286        let mut done = lock.lock().unwrap_or_else(|poison| poison.into_inner());
287        *done = true;
288        drop(done);
289        condvar.notify_all();
290    });
291
292    let (lock, condvar) = &*gate;
293    let mut done = lock.lock().unwrap_or_else(|poison| poison.into_inner());
294    while !*done {
295        if materializer.is_shutdown() {
296            return Err(StreamError::AbruptTermination);
297        }
298        if current_stream_cancelled()
299            .as_ref()
300            .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
301        {
302            return Err(StreamError::Cancelled);
303        }
304        let (next, _) = condvar
305            .wait_timeout(done, WAIT_POLL_INTERVAL)
306            .unwrap_or_else(|poison| poison.into_inner());
307        done = next;
308    }
309    Ok(())
310}
311
312#[derive(Debug)]
313struct TokenBucket {
314    available: f64,
315    last: Instant,
316    capacity: f64,
317    nanos_between_tokens: f64,
318}
319
320impl TokenBucket {
321    fn new(cost: u64, per: Duration, maximum_burst: i32) -> Self {
322        assert!(cost > 0, "throttle cost must be greater than zero");
323        assert!(
324            per > Duration::ZERO,
325            "throttle period must be greater than zero"
326        );
327        assert!(
328            per.as_nanos() >= u128::from(cost),
329            "Rates larger than 1 unit / nanosecond are not supported"
330        );
331        assert!(maximum_burst >= -1, "maximum_burst must be -1 or greater");
332        let nanos_between_tokens = per.as_nanos() as f64 / cost as f64;
333        let capacity = if maximum_burst == -1 {
334            let automatic = ((100_000_000_f64 / nanos_between_tokens).max(1.0)).floor();
335            automatic.max(1.0)
336        } else {
337            maximum_burst as f64
338        };
339        Self {
340            available: capacity,
341            last: Instant::now(),
342            capacity,
343            nanos_between_tokens,
344        }
345    }
346
347    fn offer(&mut self, cost: u64) -> Duration {
348        let now = Instant::now();
349        if now > self.last {
350            let elapsed = now.duration_since(self.last).as_nanos() as f64;
351            let replenished = elapsed / self.nanos_between_tokens;
352            self.available = (self.available + replenished).min(self.capacity);
353            self.last = now;
354        }
355
356        let cost = cost as f64;
357        if self.available >= cost {
358            self.available -= cost;
359            Duration::ZERO
360        } else {
361            let needed = cost - self.available;
362            self.available = 0.0;
363            let delay_nanos = (needed * self.nanos_between_tokens).ceil() as u64;
364            self.last = now + Duration::from_nanos(delay_nanos);
365            Duration::from_nanos(delay_nanos)
366        }
367    }
368}
369
370fn schedule_notify<T>(
371    materializer: &Materializer,
372    shared: Arc<DelayQueueShared<T>>,
373    delay: Duration,
374) where
375    T: Send + 'static,
376{
377    if delay.is_zero() {
378        shared.available.notify_all();
379        return;
380    }
381    let _timer = materializer.schedule_once(delay, move || {
382        shared.available.notify_all();
383    });
384}
385
386fn delay_stage<Out, Supplier, Strategy>(
387    input: BoxStream<Out>,
388    delay_strategy_supplier: Arc<Supplier>,
389    overflow_strategy: DelayOverflowStrategy,
390    materializer: &Materializer,
391) -> StreamResult<BoxStream<Out>>
392where
393    Out: Send + 'static,
394    Supplier: Fn() -> Strategy + Send + Sync + 'static,
395    Strategy: FnMut(&Out) -> Duration + Send + 'static,
396{
397    let shared = DelayQueueShared::new();
398    let producer_shared = Arc::clone(&shared);
399    let cancelled = Arc::clone(&shared.cancelled);
400    let state = Arc::clone(&materializer.inner.state);
401    let materializer = cloned_materializer(materializer);
402    let task_materializer = materializer.clone();
403    let completion = materializer.spawn_stream(move |_| {
404        let mut panic_guard = QueuePanicGuard::new(Arc::clone(&producer_shared));
405        let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
406        let mut delay_strategy = delay_strategy_supplier();
407
408        loop {
409            if cancelled.load(Ordering::SeqCst) {
410                panic_guard.disarm();
411                return Ok(NotUsed);
412            }
413
414            match input.next() {
415                Some(Ok(item)) => {
416                    let delay = match catch_unwind(AssertUnwindSafe(|| delay_strategy(&item))) {
417                        Ok(delay) => delay,
418                        Err(_) => {
419                            panic_guard.disarm();
420                            finish_delay_queue(
421                                &producer_shared,
422                                TerminalSignal::Error(StreamError::AbruptTermination),
423                            );
424                            return Ok(NotUsed);
425                        }
426                    };
427
428                    let deadline = Instant::now() + delay;
429                    let mut state = producer_shared
430                        .state
431                        .lock()
432                        .unwrap_or_else(|poison| poison.into_inner());
433
434                    match overflow_strategy {
435                        DelayOverflowStrategy::Backpressure => {
436                            while state.queue.len() == DELAY_BUFFER_CAPACITY
437                                && !cancelled.load(Ordering::SeqCst)
438                            {
439                                state = producer_shared
440                                    .available
441                                    .wait(state)
442                                    .unwrap_or_else(|poison| poison.into_inner());
443                            }
444                            if cancelled.load(Ordering::SeqCst) {
445                                panic_guard.disarm();
446                                return Ok(NotUsed);
447                            }
448                            let was_empty = state.queue.is_empty();
449                            state.queue.push_back((deadline, item));
450                            drop(state);
451                            if was_empty {
452                                schedule_notify(
453                                    &task_materializer,
454                                    Arc::clone(&producer_shared),
455                                    delay,
456                                );
457                            }
458                            producer_shared.available.notify_all();
459                        }
460                        DelayOverflowStrategy::DropHead => {
461                            if state.queue.len() == DELAY_BUFFER_CAPACITY {
462                                let _ = state.queue.pop_front();
463                            }
464                            let was_empty = state.queue.is_empty();
465                            state.queue.push_back((deadline, item));
466                            drop(state);
467                            if was_empty {
468                                schedule_notify(
469                                    &task_materializer,
470                                    Arc::clone(&producer_shared),
471                                    delay,
472                                );
473                            }
474                            producer_shared.available.notify_all();
475                        }
476                        DelayOverflowStrategy::DropTail => {
477                            if state.queue.len() == DELAY_BUFFER_CAPACITY {
478                                let _ = state.queue.pop_back();
479                            }
480                            let was_empty = state.queue.is_empty();
481                            state.queue.push_back((deadline, item));
482                            drop(state);
483                            if was_empty {
484                                schedule_notify(
485                                    &task_materializer,
486                                    Arc::clone(&producer_shared),
487                                    delay,
488                                );
489                            }
490                            producer_shared.available.notify_all();
491                        }
492                        DelayOverflowStrategy::DropBuffer => {
493                            if state.queue.len() == DELAY_BUFFER_CAPACITY {
494                                state.queue.clear();
495                            }
496                            let was_empty = state.queue.is_empty();
497                            state.queue.push_back((deadline, item));
498                            drop(state);
499                            if was_empty {
500                                schedule_notify(
501                                    &task_materializer,
502                                    Arc::clone(&producer_shared),
503                                    delay,
504                                );
505                            }
506                            producer_shared.available.notify_all();
507                        }
508                        DelayOverflowStrategy::DropNew => {
509                            if state.queue.len() < DELAY_BUFFER_CAPACITY {
510                                let was_empty = state.queue.is_empty();
511                                state.queue.push_back((deadline, item));
512                                drop(state);
513                                if was_empty {
514                                    schedule_notify(
515                                        &task_materializer,
516                                        Arc::clone(&producer_shared),
517                                        delay,
518                                    );
519                                }
520                                producer_shared.available.notify_all();
521                            }
522                        }
523                        DelayOverflowStrategy::Fail => {
524                            if state.queue.len() == DELAY_BUFFER_CAPACITY {
525                                state.queue.clear();
526                                drop(state);
527                                panic_guard.disarm();
528                                finish_delay_queue(
529                                    &producer_shared,
530                                    TerminalSignal::Error(StreamError::Failed(format!(
531                                        "Buffer overflow for delay operator (max capacity was: {DELAY_BUFFER_CAPACITY})!"
532                                    ))),
533                                );
534                                return Ok(NotUsed);
535                            }
536                            let was_empty = state.queue.is_empty();
537                            state.queue.push_back((deadline, item));
538                            drop(state);
539                            if was_empty {
540                                schedule_notify(
541                                    &task_materializer,
542                                    Arc::clone(&producer_shared),
543                                    delay,
544                                );
545                            }
546                            producer_shared.available.notify_all();
547                        }
548                        DelayOverflowStrategy::EmitEarly => {
549                            if state.queue.len() == DELAY_BUFFER_CAPACITY {
550                                if let Some((early_deadline, _)) = state.queue.front_mut() {
551                                    *early_deadline = Instant::now();
552                                }
553                                drop(state);
554                                producer_shared.available.notify_all();
555                                state = producer_shared
556                                    .state
557                                    .lock()
558                                    .unwrap_or_else(|poison| poison.into_inner());
559                                while state.queue.len() == DELAY_BUFFER_CAPACITY
560                                    && !cancelled.load(Ordering::SeqCst)
561                                {
562                                    state = producer_shared
563                                        .available
564                                        .wait(state)
565                                        .unwrap_or_else(|poison| poison.into_inner());
566                                }
567                                if cancelled.load(Ordering::SeqCst) {
568                                    panic_guard.disarm();
569                                    return Ok(NotUsed);
570                                }
571                            }
572                            let was_empty = state.queue.is_empty();
573                            state.queue.push_back((deadline, item));
574                            drop(state);
575                            if was_empty {
576                                schedule_notify(
577                                    &task_materializer,
578                                    Arc::clone(&producer_shared),
579                                    delay,
580                                );
581                            }
582                            producer_shared.available.notify_all();
583                        }
584                    }
585                }
586                Some(Err(error)) => {
587                    panic_guard.disarm();
588                    finish_delay_queue(&producer_shared, TerminalSignal::Error(error));
589                    return Ok(NotUsed);
590                }
591                None => {
592                    panic_guard.disarm();
593                    finish_delay_queue(&producer_shared, TerminalSignal::Complete);
594                    return Ok(NotUsed);
595                }
596            }
597        }
598    });
599
600    Ok(Box::new(DelayQueueStream {
601        shared,
602        completion: Some(completion),
603    }))
604}
605
606struct ThrottleStream<Out, CostFn> {
607    input: BoxStream<Out>,
608    materializer: Materializer,
609    token_bucket: TokenBucket,
610    cost_fn: Arc<CostFn>,
611    mode: ThrottleMode,
612    terminal: Option<TerminalSignal>,
613}
614
615impl<Out, CostFn> Iterator for ThrottleStream<Out, CostFn>
616where
617    Out: Send + 'static,
618    CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
619{
620    type Item = StreamResult<Out>;
621
622    fn next(&mut self) -> Option<Self::Item> {
623        if let Some(terminal) = self.terminal.clone() {
624            return match terminal {
625                TerminalSignal::Complete => None,
626                TerminalSignal::Error(error) => Some(Err(error)),
627            };
628        }
629
630        match self.input.next() {
631            Some(Ok(item)) => {
632                let cost = match catch_unwind(AssertUnwindSafe(|| (self.cost_fn)(&item))) {
633                    Ok(cost) => cost,
634                    Err(_) => {
635                        self.terminal = Some(TerminalSignal::Error(StreamError::AbruptTermination));
636                        return Some(Err(StreamError::AbruptTermination));
637                    }
638                };
639                let delay = self.token_bucket.offer(cost);
640                if delay.is_zero() {
641                    Some(Ok(item))
642                } else if self.mode == ThrottleMode::Enforcing {
643                    let error =
644                        StreamError::Failed("Maximum throttle throughput exceeded.".to_owned());
645                    self.terminal = Some(TerminalSignal::Error(error.clone()));
646                    Some(Err(error))
647                } else {
648                    match wait_for_timer(&self.materializer, delay) {
649                        Ok(()) => Some(Ok(item)),
650                        Err(error) => {
651                            self.terminal = Some(TerminalSignal::Error(error.clone()));
652                            Some(Err(error))
653                        }
654                    }
655                }
656            }
657            Some(Err(error)) => {
658                self.terminal = Some(TerminalSignal::Error(error.clone()));
659                Some(Err(error))
660            }
661            None => {
662                self.terminal = Some(TerminalSignal::Complete);
663                None
664            }
665        }
666    }
667}
668
669struct GroupedShared<T> {
670    state: Mutex<GroupedState<T>>,
671    available: Condvar,
672    cancelled: Arc<AtomicBool>,
673}
674
675struct GroupedState<T> {
676    ready: VecDeque<Vec<T>>,
677    current: Vec<T>,
678    current_weight: u64,
679    generation: u64,
680    terminal: Option<TerminalSignal>,
681}
682
683impl<T> GroupedShared<T> {
684    fn new() -> Arc<Self> {
685        Arc::new(Self {
686            state: Mutex::new(GroupedState {
687                ready: VecDeque::new(),
688                current: Vec::new(),
689                current_weight: 0,
690                generation: 0,
691                terminal: None,
692            }),
693            available: Condvar::new(),
694            cancelled: Arc::new(AtomicBool::new(false)),
695        })
696    }
697}
698
699struct GroupedStream<T> {
700    shared: Arc<GroupedShared<T>>,
701    completion: Option<StreamCompletion<NotUsed>>,
702}
703
704impl<T> Iterator for GroupedStream<T> {
705    type Item = StreamResult<Vec<T>>;
706
707    fn next(&mut self) -> Option<Self::Item> {
708        let mut state = self
709            .shared
710            .state
711            .lock()
712            .unwrap_or_else(|poison| poison.into_inner());
713        loop {
714            if let Some(group) = state.ready.pop_front() {
715                drop(state);
716                self.shared.available.notify_all();
717                return Some(Ok(group));
718            }
719            if let Some(terminal) = state.terminal.clone() {
720                return match terminal {
721                    TerminalSignal::Complete => None,
722                    TerminalSignal::Error(error) => Some(Err(error)),
723                };
724            }
725            if self.shared.cancelled.load(Ordering::SeqCst) {
726                return Some(Err(StreamError::Cancelled));
727            }
728            let (next, _) = self
729                .shared
730                .available
731                .wait_timeout(state, WAIT_POLL_INTERVAL)
732                .unwrap_or_else(|poison| poison.into_inner());
733            state = next;
734        }
735    }
736}
737
738impl<T> Drop for GroupedStream<T> {
739    fn drop(&mut self) {
740        self.shared.cancelled.store(true, Ordering::SeqCst);
741        self.shared.available.notify_all();
742        let _ = self.completion.take();
743    }
744}
745
746fn finish_grouped<T>(shared: &GroupedShared<T>, terminal: TerminalSignal) {
747    let mut state = shared
748        .state
749        .lock()
750        .unwrap_or_else(|poison| poison.into_inner());
751    if state.terminal.is_none() {
752        if !state.current.is_empty() {
753            let current = mem::take(&mut state.current);
754            state.ready.push_back(current);
755            state.current_weight = 0;
756            state.generation = state.generation.wrapping_add(1);
757        }
758        state.terminal = Some(terminal);
759    }
760    drop(state);
761    shared.available.notify_all();
762}
763
764fn arm_grouped_timer<T: Send + 'static>(
765    materializer: &Materializer,
766    shared: Arc<GroupedShared<T>>,
767    interval: Duration,
768    generation: u64,
769) {
770    let _timer = materializer.schedule_once(interval, move || {
771        let mut state = shared
772            .state
773            .lock()
774            .unwrap_or_else(|poison| poison.into_inner());
775        if state.terminal.is_some() || state.generation != generation || state.current.is_empty() {
776            return;
777        }
778        let current = mem::take(&mut state.current);
779        state.ready.push_back(current);
780        state.current_weight = 0;
781        state.generation = state.generation.wrapping_add(1);
782        drop(state);
783        shared.available.notify_all();
784    });
785}
786
787fn grouped_weighted_within_stage<Out, CostFn>(
788    input: BoxStream<Out>,
789    max_weight: u64,
790    max_number: usize,
791    interval: Duration,
792    cost_fn: Arc<CostFn>,
793    materializer: &Materializer,
794) -> StreamResult<BoxStream<Vec<Out>>>
795where
796    Out: Send + 'static,
797    CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
798{
799    assert!(
800        max_weight > 0,
801        "grouped_weighted_within max_weight must be greater than zero"
802    );
803    assert!(
804        max_number > 0,
805        "grouped_weighted_within max_number must be greater than zero"
806    );
807    assert!(
808        interval > Duration::ZERO,
809        "grouped_weighted_within interval must be greater than zero"
810    );
811
812    let shared = GroupedShared::new();
813    let producer_shared = Arc::clone(&shared);
814    let cancelled = Arc::clone(&shared.cancelled);
815    let state = Arc::clone(&materializer.inner.state);
816    let materializer = cloned_materializer(materializer);
817    let task_materializer = materializer.clone();
818    let completion = materializer.spawn_stream(move |_| {
819        let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
820        loop {
821            if cancelled.load(Ordering::SeqCst) {
822                return Ok(NotUsed);
823            }
824
825            match input.next() {
826                Some(Ok(item)) => {
827                    let weight = match catch_unwind(AssertUnwindSafe(|| (cost_fn)(&item))) {
828                        Ok(weight) => weight,
829                        Err(_) => {
830                            finish_grouped(
831                                &producer_shared,
832                                TerminalSignal::Error(StreamError::AbruptTermination),
833                            );
834                            return Ok(NotUsed);
835                        }
836                    };
837
838                    let mut state = producer_shared
839                        .state
840                        .lock()
841                        .unwrap_or_else(|poison| poison.into_inner());
842                    if state.current.is_empty() {
843                        state.current.push(item);
844                        state.current_weight = weight;
845                        state.generation = state.generation.wrapping_add(1);
846                        let generation = state.generation;
847                        drop(state);
848                        arm_grouped_timer(
849                            &task_materializer,
850                            Arc::clone(&producer_shared),
851                            interval,
852                            generation,
853                        );
854                        producer_shared.available.notify_all();
855                        continue;
856                    }
857
858                    let fits = state.current_weight.saturating_add(weight) <= max_weight
859                        && state.current.len() < max_number;
860                    if fits {
861                        state.current.push(item);
862                        state.current_weight = state.current_weight.saturating_add(weight);
863                        if state.current_weight >= max_weight || state.current.len() >= max_number {
864                            let current = mem::take(&mut state.current);
865                            state.ready.push_back(current);
866                            state.current_weight = 0;
867                            state.generation = state.generation.wrapping_add(1);
868                        }
869                        drop(state);
870                        producer_shared.available.notify_all();
871                        continue;
872                    }
873
874                    let current = mem::take(&mut state.current);
875                    state.ready.push_back(current);
876                    state.current_weight = 0;
877                    state.generation = state.generation.wrapping_add(1);
878                    let heavy_alone = weight > max_weight;
879                    if heavy_alone {
880                        state.ready.push_back(vec![item]);
881                    } else {
882                        state.current.push(item);
883                        state.current_weight = weight;
884                        state.generation = state.generation.wrapping_add(1);
885                        let generation = state.generation;
886                        drop(state);
887                        arm_grouped_timer(
888                            &task_materializer,
889                            Arc::clone(&producer_shared),
890                            interval,
891                            generation,
892                        );
893                        producer_shared.available.notify_all();
894                        continue;
895                    }
896                    drop(state);
897                    producer_shared.available.notify_all();
898                }
899                Some(Err(error)) => {
900                    finish_grouped(&producer_shared, TerminalSignal::Error(error));
901                    return Ok(NotUsed);
902                }
903                None => {
904                    finish_grouped(&producer_shared, TerminalSignal::Complete);
905                    return Ok(NotUsed);
906                }
907            }
908        }
909    });
910
911    Ok(Box::new(GroupedStream {
912        shared,
913        completion: Some(completion),
914    }))
915}
916
917struct ForwardExtra {
918    generation: u64,
919}
920
921fn forward_slot_stage<Out, Setup, OnItem>(
922    input: BoxStream<Out>,
923    materializer: &Materializer,
924    setup: Setup,
925    on_item: OnItem,
926) -> StreamResult<BoxStream<Out>>
927where
928    Out: Send + 'static,
929    Setup:
930        FnOnce(Arc<SlotShared<Out, ForwardExtra>>, Arc<AtomicBool>, &Materializer) + Send + 'static,
931    OnItem: Fn(&Arc<SlotShared<Out, ForwardExtra>>, &Materializer, &Out) -> StreamResult<()>
932        + Send
933        + Sync
934        + 'static,
935{
936    let shared = SlotShared::new(ForwardExtra { generation: 0 });
937    let producer_shared = Arc::clone(&shared);
938    let cancelled = Arc::clone(&shared.cancelled);
939    let state = Arc::clone(&materializer.inner.state);
940    let materializer = cloned_materializer(materializer);
941    let task_materializer = materializer.clone();
942    setup(
943        Arc::clone(&producer_shared),
944        Arc::clone(&cancelled),
945        &materializer,
946    );
947    let on_item = Arc::new(on_item);
948    let completion = materializer.spawn_stream(move |_| {
949        let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
950        let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
951        loop {
952            if cancelled.load(Ordering::SeqCst) {
953                panic_guard.disarm();
954                return Ok(NotUsed);
955            }
956            match input.next() {
957                Some(Ok(item)) => {
958                    on_item(&producer_shared, &task_materializer, &item)?;
959                    let mut state = producer_shared
960                        .state
961                        .lock()
962                        .unwrap_or_else(|poison| poison.into_inner());
963                    while state.slot.is_some()
964                        && state.terminal.is_none()
965                        && !cancelled.load(Ordering::SeqCst)
966                    {
967                        state = producer_shared
968                            .available
969                            .wait(state)
970                            .unwrap_or_else(|poison| poison.into_inner());
971                    }
972                    if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
973                        panic_guard.disarm();
974                        return Ok(NotUsed);
975                    }
976                    state.slot = Some(item);
977                    state.extra.generation = state.extra.generation.wrapping_add(1);
978                    drop(state);
979                    producer_shared.available.notify_all();
980                }
981                Some(Err(error)) => {
982                    panic_guard.disarm();
983                    finish_slot(&producer_shared, TerminalSignal::Error(error));
984                    return Ok(NotUsed);
985                }
986                None => {
987                    panic_guard.disarm();
988                    finish_slot(&producer_shared, TerminalSignal::Complete);
989                    return Ok(NotUsed);
990                }
991            }
992        }
993    });
994
995    Ok(Box::new(SlotStream {
996        shared,
997        completion: Some(completion),
998    }))
999}
1000
1001fn take_within_stage<Out: Send + 'static>(
1002    input: BoxStream<Out>,
1003    timeout: Duration,
1004    materializer: &Materializer,
1005) -> StreamResult<BoxStream<Out>> {
1006    assert!(
1007        timeout > Duration::ZERO,
1008        "take_within timeout must be greater than zero"
1009    );
1010    forward_slot_stage(
1011        input,
1012        materializer,
1013        move |shared, cancelled, materializer| {
1014            let _timer = materializer.schedule_once(timeout, move || {
1015                cancelled.store(true, Ordering::SeqCst);
1016                finish_slot(&shared, TerminalSignal::Complete);
1017            });
1018        },
1019        |_, _, _| Ok(()),
1020    )
1021}
1022
1023fn initial_delay_stage<Out: Send + 'static>(
1024    input: BoxStream<Out>,
1025    delay: Duration,
1026    materializer: &Materializer,
1027) -> StreamResult<BoxStream<Out>> {
1028    assert!(delay >= Duration::ZERO);
1029    let materializer = cloned_materializer(materializer);
1030    Ok(Box::new(InitialDelayStream {
1031        input,
1032        materializer,
1033        opened: delay.is_zero(),
1034        delay,
1035        terminal: None,
1036    }))
1037}
1038
1039struct InitialDelayStream<Out> {
1040    input: BoxStream<Out>,
1041    materializer: Materializer,
1042    opened: bool,
1043    delay: Duration,
1044    terminal: Option<TerminalSignal>,
1045}
1046
1047impl<Out: Send + 'static> Iterator for InitialDelayStream<Out> {
1048    type Item = StreamResult<Out>;
1049
1050    fn next(&mut self) -> Option<Self::Item> {
1051        if let Some(terminal) = self.terminal.clone() {
1052            return match terminal {
1053                TerminalSignal::Complete => None,
1054                TerminalSignal::Error(error) => Some(Err(error)),
1055            };
1056        }
1057        if !self.opened {
1058            if let Err(error) = wait_for_timer(&self.materializer, self.delay) {
1059                self.terminal = Some(TerminalSignal::Error(error.clone()));
1060                return Some(Err(error));
1061            }
1062            self.opened = true;
1063        }
1064        match self.input.next() {
1065            Some(Ok(item)) => Some(Ok(item)),
1066            Some(Err(error)) => {
1067                self.terminal = Some(TerminalSignal::Error(error.clone()));
1068                Some(Err(error))
1069            }
1070            None => {
1071                self.terminal = Some(TerminalSignal::Complete);
1072                None
1073            }
1074        }
1075    }
1076}
1077
1078fn arm_generation_failure<Out: Send + 'static>(
1079    materializer: &Materializer,
1080    shared: Arc<SlotShared<Out, ForwardExtra>>,
1081    timeout: Duration,
1082    message: &'static str,
1083    generation: u64,
1084    require_empty_slot: bool,
1085) {
1086    let _timer = materializer.schedule_once(timeout, move || {
1087        let should_fail = {
1088            let state = shared
1089                .state
1090                .lock()
1091                .unwrap_or_else(|poison| poison.into_inner());
1092            state.terminal.is_none()
1093                && state.extra.generation == generation
1094                && (!require_empty_slot || state.slot.is_none())
1095        };
1096        if should_fail {
1097            shared.cancelled.store(true, Ordering::SeqCst);
1098            finish_slot(
1099                &shared,
1100                TerminalSignal::Error(StreamError::Failed(message.to_owned())),
1101            );
1102        }
1103    });
1104}
1105
1106fn initial_timeout_stage<Out: Send + 'static>(
1107    input: BoxStream<Out>,
1108    timeout: Duration,
1109    materializer: &Materializer,
1110) -> StreamResult<BoxStream<Out>> {
1111    assert!(
1112        timeout > Duration::ZERO,
1113        "initial_timeout timeout must be greater than zero"
1114    );
1115    let shared = SlotShared::new(ForwardExtra { generation: 0 });
1116    let producer_shared = Arc::clone(&shared);
1117    let cancelled = Arc::clone(&shared.cancelled);
1118    let state = Arc::clone(&materializer.inner.state);
1119    let materializer = cloned_materializer(materializer);
1120    arm_generation_failure(
1121        &materializer,
1122        Arc::clone(&producer_shared),
1123        timeout,
1124        "The first element has not yet passed through before the initial timeout elapsed.",
1125        0,
1126        true,
1127    );
1128    let completion = materializer.spawn_stream(move |_| {
1129        let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
1130        let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
1131        loop {
1132            if cancelled.load(Ordering::SeqCst) {
1133                panic_guard.disarm();
1134                return Ok(NotUsed);
1135            }
1136            match input.next() {
1137                Some(Ok(item)) => {
1138                    let mut state = producer_shared
1139                        .state
1140                        .lock()
1141                        .unwrap_or_else(|poison| poison.into_inner());
1142                    state.extra.generation = state.extra.generation.wrapping_add(1);
1143                    while state.slot.is_some()
1144                        && state.terminal.is_none()
1145                        && !cancelled.load(Ordering::SeqCst)
1146                    {
1147                        state = producer_shared
1148                            .available
1149                            .wait(state)
1150                            .unwrap_or_else(|poison| poison.into_inner());
1151                    }
1152                    if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
1153                        panic_guard.disarm();
1154                        return Ok(NotUsed);
1155                    }
1156                    state.slot = Some(item);
1157                    drop(state);
1158                    producer_shared.available.notify_all();
1159                }
1160                Some(Err(error)) => {
1161                    panic_guard.disarm();
1162                    finish_slot(&producer_shared, TerminalSignal::Error(error));
1163                    return Ok(NotUsed);
1164                }
1165                None => {
1166                    panic_guard.disarm();
1167                    finish_slot(&producer_shared, TerminalSignal::Complete);
1168                    return Ok(NotUsed);
1169                }
1170            }
1171        }
1172    });
1173
1174    Ok(Box::new(SlotStream {
1175        shared,
1176        completion: Some(completion),
1177    }))
1178}
1179
1180fn completion_timeout_stage<Out: Send + 'static>(
1181    input: BoxStream<Out>,
1182    timeout: Duration,
1183    materializer: &Materializer,
1184) -> StreamResult<BoxStream<Out>> {
1185    assert!(
1186        timeout > Duration::ZERO,
1187        "completion_timeout timeout must be greater than zero"
1188    );
1189    forward_slot_stage(
1190        input,
1191        materializer,
1192        move |shared, cancelled, materializer| {
1193            let _timer = materializer.schedule_once(timeout, move || {
1194                cancelled.store(true, Ordering::SeqCst);
1195                finish_slot(
1196                    &shared,
1197                    TerminalSignal::Error(StreamError::Failed(
1198                        "The stream has not been completed before the completion timeout elapsed."
1199                            .to_owned(),
1200                    )),
1201                );
1202            });
1203        },
1204        |_, _, _| Ok(()),
1205    )
1206}
1207
1208fn idle_timeout_stage<Out: Send + 'static>(
1209    input: BoxStream<Out>,
1210    timeout: Duration,
1211    materializer: &Materializer,
1212) -> StreamResult<BoxStream<Out>> {
1213    assert!(
1214        timeout > Duration::ZERO,
1215        "idle_timeout timeout must be greater than zero"
1216    );
1217    let shared = SlotShared::new(ForwardExtra { generation: 0 });
1218    let producer_shared = Arc::clone(&shared);
1219    let cancelled = Arc::clone(&shared.cancelled);
1220    let state = Arc::clone(&materializer.inner.state);
1221    let materializer = cloned_materializer(materializer);
1222    let task_materializer = materializer.clone();
1223    arm_generation_failure(
1224        &materializer,
1225        Arc::clone(&producer_shared),
1226        timeout,
1227        "No elements passed before the idle timeout elapsed.",
1228        0,
1229        false,
1230    );
1231    let completion = materializer.spawn_stream(move |_| {
1232        let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
1233        let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
1234        loop {
1235            if cancelled.load(Ordering::SeqCst) {
1236                panic_guard.disarm();
1237                return Ok(NotUsed);
1238            }
1239            match input.next() {
1240                Some(Ok(item)) => {
1241                    let mut state = producer_shared
1242                        .state
1243                        .lock()
1244                        .unwrap_or_else(|poison| poison.into_inner());
1245                    while state.slot.is_some()
1246                        && state.terminal.is_none()
1247                        && !cancelled.load(Ordering::SeqCst)
1248                    {
1249                        state = producer_shared
1250                            .available
1251                            .wait(state)
1252                            .unwrap_or_else(|poison| poison.into_inner());
1253                    }
1254                    if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
1255                        panic_guard.disarm();
1256                        return Ok(NotUsed);
1257                    }
1258                    state.slot = Some(item);
1259                    state.extra.generation = state.extra.generation.wrapping_add(1);
1260                    let generation = state.extra.generation;
1261                    drop(state);
1262                    arm_generation_failure(
1263                        &task_materializer,
1264                        Arc::clone(&producer_shared),
1265                        timeout,
1266                        "No elements passed before the idle timeout elapsed.",
1267                        generation,
1268                        false,
1269                    );
1270                    producer_shared.available.notify_all();
1271                }
1272                Some(Err(error)) => {
1273                    panic_guard.disarm();
1274                    finish_slot(&producer_shared, TerminalSignal::Error(error));
1275                    return Ok(NotUsed);
1276                }
1277                None => {
1278                    panic_guard.disarm();
1279                    finish_slot(&producer_shared, TerminalSignal::Complete);
1280                    return Ok(NotUsed);
1281                }
1282            }
1283        }
1284    });
1285
1286    Ok(Box::new(SlotStream {
1287        shared,
1288        completion: Some(completion),
1289    }))
1290}
1291
1292fn backpressure_timeout_stage<Out: Send + 'static>(
1293    input: BoxStream<Out>,
1294    timeout: Duration,
1295    materializer: &Materializer,
1296) -> StreamResult<BoxStream<Out>> {
1297    assert!(
1298        timeout > Duration::ZERO,
1299        "backpressure_timeout timeout must be greater than zero"
1300    );
1301    let shared = SlotShared::new(ForwardExtra { generation: 0 });
1302    let producer_shared = Arc::clone(&shared);
1303    let cancelled = Arc::clone(&shared.cancelled);
1304    let state = Arc::clone(&materializer.inner.state);
1305    let materializer = cloned_materializer(materializer);
1306    let task_materializer = materializer.clone();
1307    let completion = materializer.spawn_stream(move |_| {
1308        let schedule_timeout = |generation: u64, shared: Arc<SlotShared<Out, ForwardExtra>>| {
1309            let _timer = task_materializer.schedule_once(timeout, move || {
1310                let mut state = shared
1311                    .state
1312                    .lock()
1313                    .unwrap_or_else(|poison| poison.into_inner());
1314                if state.terminal.is_some() || state.extra.generation != generation {
1315                    return;
1316                }
1317                state.slot = None;
1318                state.terminal = Some(TerminalSignal::Error(StreamError::Failed(
1319                    "No downstream demand signalled before the backpressure timeout elapsed."
1320                        .to_owned(),
1321                )));
1322                drop(state);
1323                shared.cancelled.store(true, Ordering::SeqCst);
1324                shared.available.notify_all();
1325            });
1326        };
1327        let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
1328        let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
1329        loop {
1330            if cancelled.load(Ordering::SeqCst) {
1331                panic_guard.disarm();
1332                return Ok(NotUsed);
1333            }
1334            match input.next() {
1335                Some(Ok(item)) => {
1336                    let mut state = producer_shared
1337                        .state
1338                        .lock()
1339                        .unwrap_or_else(|poison| poison.into_inner());
1340                    while state.slot.is_some()
1341                        && state.terminal.is_none()
1342                        && !cancelled.load(Ordering::SeqCst)
1343                    {
1344                        state = producer_shared
1345                            .available
1346                            .wait(state)
1347                            .unwrap_or_else(|poison| poison.into_inner());
1348                    }
1349                    if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
1350                        panic_guard.disarm();
1351                        return Ok(NotUsed);
1352                    }
1353                    state.slot = Some(item);
1354                    state.extra.generation = state.extra.generation.wrapping_add(1);
1355                    let generation = state.extra.generation;
1356                    drop(state);
1357                    schedule_timeout(generation, Arc::clone(&producer_shared));
1358                    producer_shared.available.notify_all();
1359                }
1360                Some(Err(error)) => {
1361                    panic_guard.disarm();
1362                    finish_slot(&producer_shared, TerminalSignal::Error(error));
1363                    return Ok(NotUsed);
1364                }
1365                None => {
1366                    panic_guard.disarm();
1367                    finish_slot(&producer_shared, TerminalSignal::Complete);
1368                    return Ok(NotUsed);
1369                }
1370            }
1371        }
1372    });
1373
1374    struct BackpressureStream<Out> {
1375        shared: Arc<SlotShared<Out, ForwardExtra>>,
1376        completion: Option<StreamCompletion<NotUsed>>,
1377    }
1378
1379    impl<Out> Iterator for BackpressureStream<Out> {
1380        type Item = StreamResult<Out>;
1381
1382        fn next(&mut self) -> Option<Self::Item> {
1383            let mut state = self
1384                .shared
1385                .state
1386                .lock()
1387                .unwrap_or_else(|poison| poison.into_inner());
1388            loop {
1389                if let Some(item) = state.slot.take() {
1390                    state.extra.generation = state.extra.generation.wrapping_add(1);
1391                    drop(state);
1392                    self.shared.available.notify_all();
1393                    return Some(Ok(item));
1394                }
1395                if let Some(terminal) = state.terminal.clone() {
1396                    return match terminal {
1397                        TerminalSignal::Complete => None,
1398                        TerminalSignal::Error(error) => Some(Err(error)),
1399                    };
1400                }
1401                if self.shared.cancelled.load(Ordering::SeqCst) {
1402                    return Some(Err(StreamError::Cancelled));
1403                }
1404                let (next, _) = self
1405                    .shared
1406                    .available
1407                    .wait_timeout(state, WAIT_POLL_INTERVAL)
1408                    .unwrap_or_else(|poison| poison.into_inner());
1409                state = next;
1410            }
1411        }
1412    }
1413
1414    impl<Out> Drop for BackpressureStream<Out> {
1415        fn drop(&mut self) {
1416            self.shared.cancelled.store(true, Ordering::SeqCst);
1417            self.shared.available.notify_all();
1418            let _ = self.completion.take();
1419        }
1420    }
1421
1422    Ok(Box::new(BackpressureStream {
1423        shared,
1424        completion: Some(completion),
1425    }))
1426}
1427
1428struct KeepAliveExtra {
1429    generation: u64,
1430}
1431
1432fn arm_keep_alive_timer<Out, Inject>(
1433    materializer: &Materializer,
1434    shared: Arc<SlotShared<Out, KeepAliveExtra>>,
1435    timeout: Duration,
1436    generation: u64,
1437    inject: Arc<Inject>,
1438) where
1439    Out: Send + 'static,
1440    Inject: Fn() -> Out + Send + Sync + 'static,
1441{
1442    let materializer = cloned_materializer(materializer);
1443    let task_materializer = materializer.clone();
1444    let _timer = materializer.schedule_once(timeout, move || {
1445        let slot_occupied = {
1446            let state = shared
1447                .state
1448                .lock()
1449                .unwrap_or_else(|poison| poison.into_inner());
1450            if state.terminal.is_some() || state.extra.generation != generation {
1451                return;
1452            }
1453            state.slot.is_some()
1454        };
1455        if slot_occupied {
1456            arm_keep_alive_timer(
1457                &task_materializer,
1458                Arc::clone(&shared),
1459                timeout,
1460                generation,
1461                Arc::clone(&inject),
1462            );
1463            return;
1464        }
1465
1466        let injected = match catch_unwind(AssertUnwindSafe(|| inject())) {
1467            Ok(item) => item,
1468            Err(_) => {
1469                shared.cancelled.store(true, Ordering::SeqCst);
1470                finish_slot(
1471                    &shared,
1472                    TerminalSignal::Error(StreamError::AbruptTermination),
1473                );
1474                return;
1475            }
1476        };
1477
1478        let mut state = shared
1479            .state
1480            .lock()
1481            .unwrap_or_else(|poison| poison.into_inner());
1482        if state.terminal.is_some() || state.extra.generation != generation || state.slot.is_some()
1483        {
1484            return;
1485        }
1486        state.slot = Some(injected);
1487        state.extra.generation = state.extra.generation.wrapping_add(1);
1488        let next_generation = state.extra.generation;
1489        drop(state);
1490        shared.available.notify_all();
1491        arm_keep_alive_timer(
1492            &task_materializer,
1493            Arc::clone(&shared),
1494            timeout,
1495            next_generation,
1496            Arc::clone(&inject),
1497        );
1498    });
1499}
1500
1501fn keep_alive_stage<Out, Inject>(
1502    input: BoxStream<Out>,
1503    timeout: Duration,
1504    inject: Arc<Inject>,
1505    materializer: &Materializer,
1506) -> StreamResult<BoxStream<Out>>
1507where
1508    Out: Send + 'static,
1509    Inject: Fn() -> Out + Send + Sync + 'static,
1510{
1511    assert!(
1512        timeout > Duration::ZERO,
1513        "keep_alive timeout must be greater than zero"
1514    );
1515    let shared = SlotShared::new(KeepAliveExtra { generation: 0 });
1516    let producer_shared = Arc::clone(&shared);
1517    let cancelled = Arc::clone(&shared.cancelled);
1518    let state = Arc::clone(&materializer.inner.state);
1519    let materializer = cloned_materializer(materializer);
1520    let task_materializer = materializer.clone();
1521    arm_keep_alive_timer(
1522        &materializer,
1523        Arc::clone(&producer_shared),
1524        timeout,
1525        0,
1526        Arc::clone(&inject),
1527    );
1528    let completion = materializer.spawn_stream(move |_| {
1529        let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
1530        let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
1531        loop {
1532            if cancelled.load(Ordering::SeqCst) {
1533                panic_guard.disarm();
1534                return Ok(NotUsed);
1535            }
1536            match input.next() {
1537                Some(Ok(item)) => {
1538                    let mut state = producer_shared
1539                        .state
1540                        .lock()
1541                        .unwrap_or_else(|poison| poison.into_inner());
1542                    while state.slot.is_some()
1543                        && state.terminal.is_none()
1544                        && !cancelled.load(Ordering::SeqCst)
1545                    {
1546                        state = producer_shared
1547                            .available
1548                            .wait(state)
1549                            .unwrap_or_else(|poison| poison.into_inner());
1550                    }
1551                    if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
1552                        panic_guard.disarm();
1553                        return Ok(NotUsed);
1554                    }
1555                    state.slot = Some(item);
1556                    state.extra.generation = state.extra.generation.wrapping_add(1);
1557                    let generation = state.extra.generation;
1558                    drop(state);
1559                    producer_shared.available.notify_all();
1560                    arm_keep_alive_timer(
1561                        &task_materializer,
1562                        Arc::clone(&producer_shared),
1563                        timeout,
1564                        generation,
1565                        Arc::clone(&inject),
1566                    );
1567                }
1568                Some(Err(error)) => {
1569                    panic_guard.disarm();
1570                    finish_slot(&producer_shared, TerminalSignal::Error(error));
1571                    return Ok(NotUsed);
1572                }
1573                None => {
1574                    panic_guard.disarm();
1575                    finish_slot(&producer_shared, TerminalSignal::Complete);
1576                    return Ok(NotUsed);
1577                }
1578            }
1579        }
1580    });
1581
1582    Ok(Box::new(SlotStream {
1583        shared,
1584        completion: Some(completion),
1585    }))
1586}
1587
1588fn drop_within_stage<Out: Send + 'static>(
1589    input: BoxStream<Out>,
1590    timeout: Duration,
1591    materializer: &Materializer,
1592) -> StreamResult<BoxStream<Out>> {
1593    assert!(
1594        timeout > Duration::ZERO,
1595        "drop_within timeout must be greater than zero"
1596    );
1597    struct DropWithinExtra;
1598    let shared = SlotShared::new(DropWithinExtra);
1599    let producer_shared = Arc::clone(&shared);
1600    let cancelled = Arc::clone(&shared.cancelled);
1601    let state = Arc::clone(&materializer.inner.state);
1602    let materializer = cloned_materializer(materializer);
1603    let deadline = Instant::now() + timeout;
1604    let completion = materializer.spawn_stream(move |_| {
1605        let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
1606        let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
1607        loop {
1608            if cancelled.load(Ordering::SeqCst) {
1609                panic_guard.disarm();
1610                return Ok(NotUsed);
1611            }
1612            match input.next() {
1613                Some(Ok(item)) => {
1614                    if Instant::now() < deadline {
1615                        continue;
1616                    }
1617                    let mut state = producer_shared
1618                        .state
1619                        .lock()
1620                        .unwrap_or_else(|poison| poison.into_inner());
1621                    while state.slot.is_some()
1622                        && state.terminal.is_none()
1623                        && !cancelled.load(Ordering::SeqCst)
1624                    {
1625                        state = producer_shared
1626                            .available
1627                            .wait(state)
1628                            .unwrap_or_else(|poison| poison.into_inner());
1629                    }
1630                    if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
1631                        panic_guard.disarm();
1632                        return Ok(NotUsed);
1633                    }
1634                    state.slot = Some(item);
1635                    drop(state);
1636                    producer_shared.available.notify_all();
1637                }
1638                Some(Err(error)) => {
1639                    panic_guard.disarm();
1640                    finish_slot(&producer_shared, TerminalSignal::Error(error));
1641                    return Ok(NotUsed);
1642                }
1643                None => {
1644                    panic_guard.disarm();
1645                    finish_slot(&producer_shared, TerminalSignal::Complete);
1646                    return Ok(NotUsed);
1647                }
1648            }
1649        }
1650    });
1651
1652    Ok(Box::new(SlotStream {
1653        shared,
1654        completion: Some(completion),
1655    }))
1656}
1657
1658impl<In: Send + 'static, Out: Send + 'static, Mat: Send + 'static> Flow<In, Out, Mat> {
1659    pub fn throttle(
1660        self,
1661        elements: u64,
1662        per: Duration,
1663        maximum_burst: i32,
1664        mode: ThrottleMode,
1665    ) -> Flow<In, Out, Mat> {
1666        self.throttle_with_cost(elements, per, maximum_burst, |_| 1, mode)
1667    }
1668
1669    pub fn throttle_with_cost<CostFn>(
1670        self,
1671        cost: u64,
1672        per: Duration,
1673        maximum_burst: i32,
1674        cost_fn: CostFn,
1675        mode: ThrottleMode,
1676    ) -> Flow<In, Out, Mat>
1677    where
1678        CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
1679    {
1680        let cost_fn = Arc::new(cost_fn);
1681        self.via(Flow::from_runtime_transform(move |input, materializer| {
1682            let materializer = cloned_materializer(materializer);
1683            Ok(Box::new(ThrottleStream {
1684                input,
1685                materializer,
1686                token_bucket: TokenBucket::new(cost, per, maximum_burst),
1687                cost_fn: Arc::clone(&cost_fn),
1688                mode,
1689                terminal: None,
1690            }))
1691        }))
1692    }
1693
1694    pub fn delay(self, delay: Duration, strategy: DelayOverflowStrategy) -> Flow<In, Out, Mat> {
1695        self.delay_with(move || move |_: &Out| delay, strategy)
1696    }
1697
1698    pub fn delay_with<Supplier, Strategy>(
1699        self,
1700        delay_strategy_supplier: Supplier,
1701        overflow_strategy: DelayOverflowStrategy,
1702    ) -> Flow<In, Out, Mat>
1703    where
1704        Supplier: Fn() -> Strategy + Send + Sync + 'static,
1705        Strategy: FnMut(&Out) -> Duration + Send + 'static,
1706    {
1707        let delay_strategy_supplier = Arc::new(delay_strategy_supplier);
1708        self.via(Flow::from_runtime_transform(move |input, materializer| {
1709            delay_stage(
1710                input,
1711                Arc::clone(&delay_strategy_supplier),
1712                overflow_strategy,
1713                materializer,
1714            )
1715        }))
1716    }
1717
1718    pub fn initial_delay(self, delay: Duration) -> Flow<In, Out, Mat> {
1719        self.via(Flow::from_runtime_transform(move |input, materializer| {
1720            initial_delay_stage(input, delay, materializer)
1721        }))
1722    }
1723
1724    pub fn grouped_within(self, max_number: usize, interval: Duration) -> Flow<In, Vec<Out>, Mat> {
1725        let unit_cost = Arc::new(|_: &Out| 1_u64);
1726        self.via(Flow::from_runtime_transform(move |input, materializer| {
1727            grouped_weighted_within_stage(
1728                input,
1729                max_number as u64,
1730                max_number,
1731                interval,
1732                Arc::clone(&unit_cost),
1733                materializer,
1734            )
1735        }))
1736    }
1737
1738    pub fn grouped_weighted_within<CostFn>(
1739        self,
1740        max_weight: u64,
1741        interval: Duration,
1742        cost_fn: CostFn,
1743    ) -> Flow<In, Vec<Out>, Mat>
1744    where
1745        CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
1746    {
1747        let cost_fn = Arc::new(cost_fn);
1748        self.via(Flow::from_runtime_transform(move |input, materializer| {
1749            grouped_weighted_within_stage(
1750                input,
1751                max_weight,
1752                usize::MAX,
1753                interval,
1754                Arc::clone(&cost_fn),
1755                materializer,
1756            )
1757        }))
1758    }
1759
1760    pub fn drop_within(self, timeout: Duration) -> Flow<In, Out, Mat> {
1761        self.via(Flow::from_runtime_transform(move |input, materializer| {
1762            drop_within_stage(input, timeout, materializer)
1763        }))
1764    }
1765
1766    pub fn take_within(self, timeout: Duration) -> Flow<In, Out, Mat> {
1767        self.via(Flow::from_runtime_transform(move |input, materializer| {
1768            take_within_stage(input, timeout, materializer)
1769        }))
1770    }
1771
1772    pub fn idle_timeout(self, timeout: Duration) -> Flow<In, Out, Mat> {
1773        self.via(Flow::from_runtime_transform(move |input, materializer| {
1774            idle_timeout_stage(input, timeout, materializer)
1775        }))
1776    }
1777
1778    pub fn backpressure_timeout(self, timeout: Duration) -> Flow<In, Out, Mat> {
1779        self.via(Flow::from_runtime_transform(move |input, materializer| {
1780            backpressure_timeout_stage(input, timeout, materializer)
1781        }))
1782    }
1783
1784    pub fn completion_timeout(self, timeout: Duration) -> Flow<In, Out, Mat> {
1785        self.via(Flow::from_runtime_transform(move |input, materializer| {
1786            completion_timeout_stage(input, timeout, materializer)
1787        }))
1788    }
1789
1790    pub fn initial_timeout(self, timeout: Duration) -> Flow<In, Out, Mat> {
1791        self.via(Flow::from_runtime_transform(move |input, materializer| {
1792            initial_timeout_stage(input, timeout, materializer)
1793        }))
1794    }
1795
1796    pub fn keep_alive<Inject>(self, timeout: Duration, inject: Inject) -> Flow<In, Out, Mat>
1797    where
1798        Inject: Fn() -> Out + Send + Sync + 'static,
1799    {
1800        let inject = Arc::new(inject);
1801        self.via(Flow::from_runtime_transform(move |input, materializer| {
1802            keep_alive_stage(input, timeout, Arc::clone(&inject), materializer)
1803        }))
1804    }
1805}
1806
1807impl<Out: Send + 'static, Mat: Send + 'static> Source<Out, Mat> {
1808    pub fn throttle(
1809        self,
1810        elements: u64,
1811        per: Duration,
1812        maximum_burst: i32,
1813        mode: ThrottleMode,
1814    ) -> Self {
1815        self.via(Flow::identity().throttle(elements, per, maximum_burst, mode))
1816    }
1817
1818    pub fn throttle_with_cost<CostFn>(
1819        self,
1820        cost: u64,
1821        per: Duration,
1822        maximum_burst: i32,
1823        cost_fn: CostFn,
1824        mode: ThrottleMode,
1825    ) -> Self
1826    where
1827        CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
1828    {
1829        self.via(Flow::identity().throttle_with_cost(cost, per, maximum_burst, cost_fn, mode))
1830    }
1831
1832    pub fn delay(self, delay: Duration, strategy: DelayOverflowStrategy) -> Self {
1833        self.via(Flow::identity().delay(delay, strategy))
1834    }
1835
1836    pub fn delay_with<Supplier, Strategy>(
1837        self,
1838        delay_strategy_supplier: Supplier,
1839        overflow_strategy: DelayOverflowStrategy,
1840    ) -> Self
1841    where
1842        Supplier: Fn() -> Strategy + Send + Sync + 'static,
1843        Strategy: FnMut(&Out) -> Duration + Send + 'static,
1844    {
1845        self.via(Flow::identity().delay_with(delay_strategy_supplier, overflow_strategy))
1846    }
1847
1848    pub fn initial_delay(self, delay: Duration) -> Self {
1849        self.via(Flow::identity().initial_delay(delay))
1850    }
1851
1852    pub fn grouped_within(self, max_number: usize, interval: Duration) -> Source<Vec<Out>, Mat> {
1853        self.via(Flow::identity().grouped_within(max_number, interval))
1854    }
1855
1856    pub fn grouped_weighted_within<CostFn>(
1857        self,
1858        max_weight: u64,
1859        interval: Duration,
1860        cost_fn: CostFn,
1861    ) -> Source<Vec<Out>, Mat>
1862    where
1863        CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
1864    {
1865        self.via(Flow::identity().grouped_weighted_within(max_weight, interval, cost_fn))
1866    }
1867
1868    pub fn drop_within(self, timeout: Duration) -> Self {
1869        self.via(Flow::identity().drop_within(timeout))
1870    }
1871
1872    pub fn take_within(self, timeout: Duration) -> Self {
1873        self.via(Flow::identity().take_within(timeout))
1874    }
1875
1876    pub fn idle_timeout(self, timeout: Duration) -> Self {
1877        self.via(Flow::identity().idle_timeout(timeout))
1878    }
1879
1880    pub fn backpressure_timeout(self, timeout: Duration) -> Self {
1881        self.via(Flow::identity().backpressure_timeout(timeout))
1882    }
1883
1884    pub fn completion_timeout(self, timeout: Duration) -> Self {
1885        self.via(Flow::identity().completion_timeout(timeout))
1886    }
1887
1888    pub fn initial_timeout(self, timeout: Duration) -> Self {
1889        self.via(Flow::identity().initial_timeout(timeout))
1890    }
1891
1892    pub fn keep_alive<Inject>(self, timeout: Duration, inject: Inject) -> Self
1893    where
1894        Inject: Fn() -> Out + Send + Sync + 'static,
1895    {
1896        self.via(Flow::identity().keep_alive(timeout, inject))
1897    }
1898}
1899
1900impl<Out: Clone + Send + Sync + 'static> Source<Out, Cancellable> {
1901    pub fn tick(initial_delay: Duration, interval: Duration, element: Out) -> Self {
1902        assert!(
1903            interval > Duration::ZERO,
1904            "tick interval must be greater than zero"
1905        );
1906        Source::from_materialized_factory(move |materializer| {
1907            struct TickState {
1908                pending: bool,
1909                closed: bool,
1910            }
1911
1912            let shared = Arc::new((
1913                Mutex::new(TickState {
1914                    pending: false,
1915                    closed: false,
1916                }),
1917                Condvar::new(),
1918            ));
1919            let keep_alive: Arc<dyn Send + Sync> =
1920                Arc::clone(&materializer.inner) as Arc<dyn Send + Sync>;
1921            let cancellable = Cancellable::new_with_keep_alive(Some(keep_alive));
1922            let cancelled = Arc::clone(&cancellable.cancelled);
1923            let shutdown = Arc::clone(&materializer.inner.state.shutdown);
1924            let shared_task = Arc::clone(&shared);
1925            let timer =
1926                materializer.schedule_with_fixed_delay(initial_delay, interval, move || {
1927                    if cancelled.load(Ordering::SeqCst) || shutdown.load(Ordering::SeqCst) {
1928                        return;
1929                    }
1930                    let (lock, condvar) = &*shared_task;
1931                    let mut state = lock.lock().unwrap_or_else(|poison| poison.into_inner());
1932                    if !state.closed {
1933                        state.pending = true;
1934                    }
1935                    drop(state);
1936                    condvar.notify_all();
1937                });
1938
1939            struct TickStream<Out> {
1940                shared: Arc<(Mutex<TickState>, Condvar)>,
1941                timer: Cancellable,
1942                external: Cancellable,
1943                element: Out,
1944                shutdown: Arc<AtomicBool>,
1945            }
1946
1947            impl<Out: Clone> Iterator for TickStream<Out> {
1948                type Item = StreamResult<Out>;
1949
1950                fn next(&mut self) -> Option<Self::Item> {
1951                    let (lock, condvar) = &*self.shared;
1952                    let mut state = lock.lock().unwrap_or_else(|poison| poison.into_inner());
1953                    loop {
1954                        if self.external.is_cancelled() || self.shutdown.load(Ordering::SeqCst) {
1955                            state.closed = true;
1956                            self.timer.cancel();
1957                            return None;
1958                        }
1959                        if state.pending {
1960                            state.pending = false;
1961                            return Some(Ok(self.element.clone()));
1962                        }
1963                        let (next, _) = condvar
1964                            .wait_timeout(state, WAIT_POLL_INTERVAL)
1965                            .unwrap_or_else(|poison| poison.into_inner());
1966                        state = next;
1967                    }
1968                }
1969            }
1970
1971            impl<Out> Drop for TickStream<Out> {
1972                fn drop(&mut self) {
1973                    let (lock, condvar) = &*self.shared;
1974                    let mut state = lock.lock().unwrap_or_else(|poison| poison.into_inner());
1975                    state.closed = true;
1976                    drop(state);
1977                    self.timer.cancel();
1978                    condvar.notify_all();
1979                }
1980            }
1981
1982            Ok((
1983                Box::new(TickStream {
1984                    shared,
1985                    timer,
1986                    external: cancellable.clone(),
1987                    element: element.clone(),
1988                    shutdown: Arc::clone(&materializer.inner.state.shutdown),
1989                }) as BoxStream<Out>,
1990                cancellable,
1991            ))
1992        })
1993    }
1994}
1995
1996#[cfg(test)]
1997mod tests {
1998    use super::*;
1999    use crate::testkit::{TestSink, TestSource};
2000    use std::sync::mpsc;
2001    use std::thread;
2002
2003    const LOAD_TIMEOUT: Duration = Duration::from_millis(100);
2004    const LOAD_GAP: Duration = Duration::from_millis(250);
2005
2006    fn materialize_stream<T: Send + 'static, Mat: Send + 'static>(
2007        source: Source<T, Mat>,
2008    ) -> (BoxStream<T>, Materializer, Mat) {
2009        let materializer = Materializer::new();
2010        let (stream, mat) = Arc::clone(&source.factory)
2011            .create(&materializer)
2012            .expect("stream materializes");
2013        (stream, materializer, mat)
2014    }
2015
2016    #[test]
2017    fn throttle_shaping_spaces_elements() {
2018        let (tx, rx) = mpsc::channel();
2019        Source::from_iter(1..=3)
2020            .map(move |_| {
2021                tx.send(Instant::now()).unwrap();
2022            })
2023            .throttle(1, Duration::from_millis(40), 0, ThrottleMode::Shaping)
2024            .run_with(Sink::ignore())
2025            .unwrap()
2026            .wait()
2027            .unwrap();
2028
2029        let first = rx.recv_timeout(Duration::from_millis(250)).unwrap();
2030        let second = rx.recv_timeout(Duration::from_millis(250)).unwrap();
2031        let third = rx.recv_timeout(Duration::from_millis(250)).unwrap();
2032        assert!(second.duration_since(first) >= Duration::from_millis(20));
2033        assert!(third.duration_since(second) >= Duration::from_millis(20));
2034    }
2035
2036    #[test]
2037    fn throttle_enforcing_fails_when_rate_exceeded() {
2038        let result = Source::from_iter(1..=3)
2039            .throttle(1, Duration::from_millis(80), 1, ThrottleMode::Enforcing)
2040            .run_collect();
2041        assert_eq!(
2042            result,
2043            Err(StreamError::Failed(
2044                "Maximum throttle throughput exceeded.".to_owned()
2045            ))
2046        );
2047    }
2048
2049    #[test]
2050    fn delay_delays_first_element_without_reordering() {
2051        let start = Instant::now();
2052        let items = Source::from_iter([1, 2])
2053            .delay(
2054                Duration::from_millis(40),
2055                DelayOverflowStrategy::Backpressure,
2056            )
2057            .run_collect()
2058            .unwrap();
2059        assert_eq!(items, vec![1, 2]);
2060        assert!(start.elapsed() >= Duration::from_millis(20));
2061    }
2062
2063    #[test]
2064    fn delay_emit_early_preserves_elements_when_buffer_not_full() {
2065        let start = Instant::now();
2066        let items = Source::from_iter([1, 2])
2067            .delay(Duration::from_millis(40), DelayOverflowStrategy::EmitEarly)
2068            .run_collect()
2069            .unwrap();
2070        assert_eq!(items, vec![1, 2]);
2071        assert!(start.elapsed() >= Duration::from_millis(20));
2072    }
2073
2074    #[test]
2075    fn delay_emit_early_flushes_oldest_when_buffer_full() {
2076        let delay = Duration::from_secs(2);
2077        let (mut stream, materializer, _mat) = materialize_stream(
2078            Source::from_iter(1..=17).delay(delay, DelayOverflowStrategy::EmitEarly),
2079        );
2080
2081        let start = Instant::now();
2082        assert_eq!(stream.next(), Some(Ok(1)));
2083        assert!(
2084            start.elapsed() < Duration::from_millis(750),
2085            "EmitEarly should flush well before the configured delay under load"
2086        );
2087
2088        let mut items = vec![1];
2089        for item in stream {
2090            items.push(item.unwrap());
2091        }
2092        assert_eq!(items, (1..=17).collect::<Vec<_>>());
2093        materializer.shutdown();
2094    }
2095
2096    #[test]
2097    fn initial_delay_holds_back_first_pull() {
2098        let start = Instant::now();
2099        let result = Source::single(42)
2100            .initial_delay(Duration::from_millis(40))
2101            .run_with(Sink::head())
2102            .unwrap();
2103        assert_eq!(result.wait().unwrap(), 42);
2104        assert!(start.elapsed() >= Duration::from_millis(20));
2105    }
2106
2107    #[test]
2108    fn grouped_within_flushes_on_timer() {
2109        let (publisher, subscriber) = TestSource::probe::<i32>()
2110            .grouped_within(8, Duration::from_millis(40))
2111            .to_mat(TestSink::probe(), Keep::both)
2112            .run()
2113            .unwrap();
2114
2115        publisher.expect_request();
2116        publisher.send_next(1);
2117        publisher.expect_request();
2118        publisher.send_next(2);
2119
2120        subscriber.request(1);
2121        assert_eq!(subscriber.expect_next(), vec![1, 2]);
2122    }
2123
2124    #[test]
2125    fn grouped_weighted_within_flushes_on_weight() {
2126        let result = Source::from_iter(["aa", "bbb", "c"])
2127            .grouped_weighted_within(4, Duration::from_secs(1), |s| s.len() as u64)
2128            .run_collect()
2129            .unwrap();
2130        assert_eq!(result, vec![vec!["aa"], vec!["bbb", "c"]]);
2131    }
2132
2133    #[test]
2134    fn drop_within_drops_early_elements() {
2135        let (publisher, subscriber) = TestSource::probe::<i32>()
2136            .drop_within(Duration::from_millis(40))
2137            .to_mat(TestSink::probe(), Keep::both)
2138            .run()
2139            .unwrap();
2140
2141        publisher.expect_request();
2142        publisher.send_next(1);
2143        thread::sleep(Duration::from_millis(120));
2144        publisher.expect_request();
2145        publisher.send_next(2);
2146        publisher.send_complete();
2147
2148        subscriber.request(2);
2149        assert_eq!(subscriber.expect_next(), 2);
2150        subscriber.expect_complete();
2151    }
2152
2153    #[test]
2154    fn take_within_completes_after_timeout() {
2155        let (publisher, subscriber) = TestSource::probe::<i32>()
2156            .take_within(Duration::from_millis(40))
2157            .to_mat(TestSink::probe(), Keep::both)
2158            .run()
2159            .unwrap();
2160
2161        publisher.expect_request();
2162        publisher.send_next(1);
2163        subscriber.request(2);
2164        assert_eq!(subscriber.expect_next(), 1);
2165        subscriber.expect_complete();
2166    }
2167
2168    #[test]
2169    fn idle_timeout_fails_on_gap() {
2170        let (publisher, subscriber) = TestSource::probe::<i32>()
2171            .idle_timeout(Duration::from_millis(40))
2172            .to_mat(TestSink::probe(), Keep::both)
2173            .run()
2174            .unwrap();
2175
2176        publisher.expect_request();
2177        publisher.send_next(1);
2178        subscriber.request(2);
2179        assert_eq!(subscriber.expect_next(), 1);
2180        assert!(matches!(subscriber.expect_error(), StreamError::Failed(_)));
2181    }
2182
2183    #[test]
2184    fn backpressure_timeout_fails_without_demand() {
2185        let (publisher, subscriber) = TestSource::probe::<i32>()
2186            .backpressure_timeout(LOAD_TIMEOUT)
2187            .to_mat(TestSink::probe(), Keep::both)
2188            .run()
2189            .unwrap();
2190
2191        subscriber.request(1);
2192        publisher.expect_request();
2193        publisher.send_next(1);
2194        assert_eq!(subscriber.expect_next(), 1);
2195        publisher.expect_request();
2196        publisher.send_next(2);
2197        thread::sleep(LOAD_GAP);
2198        subscriber.request(1);
2199        assert!(matches!(subscriber.expect_error(), StreamError::Failed(_)));
2200    }
2201
2202    #[test]
2203    fn completion_timeout_fails_unfinished_stream() {
2204        let result = Source::<i32>::never()
2205            .completion_timeout(Duration::from_millis(40))
2206            .run_collect();
2207        assert!(matches!(result, Err(StreamError::Failed(_))));
2208    }
2209
2210    #[test]
2211    fn initial_timeout_fails_before_first_element() {
2212        let (mut stream, materializer, _mat) =
2213            materialize_stream(Source::<i32>::never().initial_timeout(Duration::from_millis(40)));
2214        assert!(matches!(stream.next(), Some(Err(StreamError::Failed(_)))));
2215        materializer.shutdown();
2216    }
2217
2218    #[test]
2219    fn keep_alive_injects_on_idle_gap() {
2220        let (publisher, subscriber) = TestSource::probe::<i32>()
2221            .keep_alive(LOAD_TIMEOUT, || 0)
2222            .to_mat(TestSink::probe(), Keep::both)
2223            .run()
2224            .unwrap();
2225
2226        subscriber.request(3);
2227        publisher.expect_request();
2228        publisher.send_next(1);
2229        assert_eq!(subscriber.expect_next(), 1);
2230        assert_eq!(subscriber.expect_next(), 0);
2231    }
2232
2233    #[test]
2234    fn keep_alive_rearms_after_slow_consumer_drains_slot() {
2235        let (publisher, subscriber) = TestSource::probe::<i32>()
2236            .keep_alive(LOAD_TIMEOUT, || 0)
2237            .to_mat(TestSink::probe(), Keep::both)
2238            .run()
2239            .unwrap();
2240
2241        subscriber.request(1);
2242        publisher.expect_request();
2243        publisher.send_next(1);
2244        assert_eq!(subscriber.expect_next(), 1);
2245
2246        publisher.expect_request();
2247        publisher.send_next(2);
2248        thread::sleep(LOAD_GAP);
2249
2250        subscriber.request(1);
2251        assert_eq!(subscriber.expect_next(), 2);
2252
2253        thread::sleep(LOAD_GAP);
2254        subscriber.request(1);
2255        assert_eq!(subscriber.expect_next(), 0);
2256    }
2257
2258    #[test]
2259    #[should_panic(expected = "maximum_burst must be -1 or greater")]
2260    fn throttle_rejects_invalid_negative_maximum_burst() {
2261        let _ = Source::single(1)
2262            .throttle(1, Duration::from_millis(40), -2, ThrottleMode::Shaping)
2263            .run_collect();
2264    }
2265
2266    #[test]
2267    fn tick_drops_missed_ticks_and_cancels() {
2268        let (mut stream, materializer, _cancellable) = materialize_stream(Source::tick(
2269            Duration::from_millis(20),
2270            Duration::from_millis(20),
2271            7_i32,
2272        ));
2273
2274        thread::sleep(Duration::from_millis(100));
2275        assert_eq!(stream.next(), Some(Ok(7)));
2276        assert_eq!(stream.next(), Some(Ok(7)));
2277        materializer.shutdown();
2278        assert_eq!(stream.next(), None);
2279    }
2280}