1use super::*;
9use crate::stream::runtime::current_stream_cancelled;
10use std::collections::VecDeque;
11use std::mem;
12use std::panic::{AssertUnwindSafe, catch_unwind};
13use std::time::Instant;
14
15const DELAY_BUFFER_CAPACITY: usize = 16;
16const WAIT_POLL_INTERVAL: Duration = Duration::from_millis(10);
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum ThrottleMode {
20 Shaping,
21 Enforcing,
22}
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum DelayOverflowStrategy {
26 EmitEarly,
27 DropHead,
28 DropTail,
29 DropBuffer,
30 DropNew,
31 Backpressure,
32 Fail,
33}
34
35#[derive(Clone)]
36enum TerminalSignal {
37 Complete,
38 Error(StreamError),
39}
40
41struct DelayQueueShared<T> {
42 state: Mutex<DelayQueueState<T>>,
43 available: Condvar,
44 cancelled: Arc<AtomicBool>,
45}
46
47struct DelayQueueState<T> {
48 queue: VecDeque<(Instant, T)>,
49 terminal: Option<TerminalSignal>,
50}
51
52impl<T> DelayQueueShared<T> {
53 fn new() -> Arc<Self> {
54 Arc::new(Self {
55 state: Mutex::new(DelayQueueState {
56 queue: VecDeque::new(),
57 terminal: None,
58 }),
59 available: Condvar::new(),
60 cancelled: Arc::new(AtomicBool::new(false)),
61 })
62 }
63}
64
65struct DelayQueueStream<T> {
66 shared: Arc<DelayQueueShared<T>>,
67 completion: Option<StreamCompletion<NotUsed>>,
68}
69
70impl<T> Iterator for DelayQueueStream<T> {
71 type Item = StreamResult<T>;
72
73 fn next(&mut self) -> Option<Self::Item> {
74 let mut state = self
75 .shared
76 .state
77 .lock()
78 .unwrap_or_else(|poison| poison.into_inner());
79 loop {
80 if let Some((deadline, _)) = state.queue.front() {
81 let now = Instant::now();
82 if *deadline <= now {
83 let (_, item) = state.queue.pop_front().expect("front element present");
84 drop(state);
85 self.shared.available.notify_all();
86 return Some(Ok(item));
87 }
88 let wait = (*deadline - now).min(WAIT_POLL_INTERVAL);
89 let (next, _) = self
90 .shared
91 .available
92 .wait_timeout(state, wait)
93 .unwrap_or_else(|poison| poison.into_inner());
94 state = next;
95 continue;
96 }
97
98 if let Some(terminal) = state.terminal.clone() {
99 return match terminal {
100 TerminalSignal::Complete => None,
101 TerminalSignal::Error(error) => Some(Err(error)),
102 };
103 }
104 if self.shared.cancelled.load(Ordering::SeqCst) {
105 return Some(Err(StreamError::Cancelled));
106 }
107
108 let (next, _) = self
109 .shared
110 .available
111 .wait_timeout(state, WAIT_POLL_INTERVAL)
112 .unwrap_or_else(|poison| poison.into_inner());
113 state = next;
114 }
115 }
116}
117
118impl<T> Drop for DelayQueueStream<T> {
119 fn drop(&mut self) {
120 self.shared.cancelled.store(true, Ordering::SeqCst);
121 self.shared.available.notify_all();
122 let _ = self.completion.take();
123 }
124}
125
126struct SlotShared<T, Extra> {
127 state: Mutex<SlotState<T, Extra>>,
128 available: Condvar,
129 cancelled: Arc<AtomicBool>,
130}
131
132struct SlotState<T, Extra> {
133 slot: Option<T>,
134 terminal: Option<TerminalSignal>,
135 extra: Extra,
136}
137
138impl<T, Extra> SlotShared<T, Extra> {
139 fn new(extra: Extra) -> Arc<Self> {
140 Arc::new(Self {
141 state: Mutex::new(SlotState {
142 slot: None,
143 terminal: None,
144 extra,
145 }),
146 available: Condvar::new(),
147 cancelled: Arc::new(AtomicBool::new(false)),
148 })
149 }
150}
151
152struct SlotStream<T, Extra> {
153 shared: Arc<SlotShared<T, Extra>>,
154 completion: Option<StreamCompletion<NotUsed>>,
155}
156
157impl<T, Extra> Iterator for SlotStream<T, Extra> {
158 type Item = StreamResult<T>;
159
160 fn next(&mut self) -> Option<Self::Item> {
161 let mut state = self
162 .shared
163 .state
164 .lock()
165 .unwrap_or_else(|poison| poison.into_inner());
166 loop {
167 if let Some(item) = state.slot.take() {
168 drop(state);
169 self.shared.available.notify_all();
170 return Some(Ok(item));
171 }
172 if let Some(terminal) = state.terminal.clone() {
173 return match terminal {
174 TerminalSignal::Complete => None,
175 TerminalSignal::Error(error) => Some(Err(error)),
176 };
177 }
178 if self.shared.cancelled.load(Ordering::SeqCst) {
179 return Some(Err(StreamError::Cancelled));
180 }
181 let (next, _) = self
182 .shared
183 .available
184 .wait_timeout(state, WAIT_POLL_INTERVAL)
185 .unwrap_or_else(|poison| poison.into_inner());
186 state = next;
187 }
188 }
189}
190
191impl<T, Extra> Drop for SlotStream<T, Extra> {
192 fn drop(&mut self) {
193 self.shared.cancelled.store(true, Ordering::SeqCst);
194 self.shared.available.notify_all();
195 let _ = self.completion.take();
196 }
197}
198
199fn finish_delay_queue<T>(shared: &DelayQueueShared<T>, terminal: TerminalSignal) {
200 let mut state = shared
201 .state
202 .lock()
203 .unwrap_or_else(|poison| poison.into_inner());
204 if state.terminal.is_none() {
205 state.terminal = Some(terminal);
206 }
207 drop(state);
208 shared.available.notify_all();
209}
210
211fn finish_slot<T, Extra>(shared: &SlotShared<T, Extra>, terminal: TerminalSignal) {
212 let mut state = shared
213 .state
214 .lock()
215 .unwrap_or_else(|poison| poison.into_inner());
216 if state.terminal.is_none() {
217 state.terminal = Some(terminal);
218 }
219 drop(state);
220 shared.available.notify_all();
221}
222
223struct QueuePanicGuard<T> {
224 shared: Arc<DelayQueueShared<T>>,
225 armed: bool,
226}
227
228impl<T> QueuePanicGuard<T> {
229 fn new(shared: Arc<DelayQueueShared<T>>) -> Self {
230 Self {
231 shared,
232 armed: true,
233 }
234 }
235
236 fn disarm(&mut self) {
237 self.armed = false;
238 }
239}
240
241impl<T> Drop for QueuePanicGuard<T> {
242 fn drop(&mut self) {
243 if self.armed {
244 finish_delay_queue(
245 &self.shared,
246 TerminalSignal::Error(StreamError::AbruptTermination),
247 );
248 }
249 }
250}
251
252struct SlotPanicGuard<T, Extra> {
253 shared: Arc<SlotShared<T, Extra>>,
254 armed: bool,
255}
256
257impl<T, Extra> SlotPanicGuard<T, Extra> {
258 fn new(shared: Arc<SlotShared<T, Extra>>) -> Self {
259 Self {
260 shared,
261 armed: true,
262 }
263 }
264
265 fn disarm(&mut self) {
266 self.armed = false;
267 }
268}
269
270impl<T, Extra> Drop for SlotPanicGuard<T, Extra> {
271 fn drop(&mut self) {
272 if self.armed {
273 finish_slot(
274 &self.shared,
275 TerminalSignal::Error(StreamError::AbruptTermination),
276 );
277 }
278 }
279}
280
281fn cloned_materializer(materializer: &Materializer) -> Materializer {
282 materializer.with_name_prefix(Arc::from(materializer.name_prefix()))
283}
284
285fn wait_for_timer(materializer: &Materializer, delay: Duration) -> StreamResult<()> {
286 if delay.is_zero() {
287 return Ok(());
288 }
289 let gate = Arc::new((Mutex::new(false), Condvar::new()));
290 let gate_task = Arc::clone(&gate);
291 let _timer = materializer.schedule_once(delay, move || {
292 let (lock, condvar) = &*gate_task;
293 let mut done = lock.lock().unwrap_or_else(|poison| poison.into_inner());
294 *done = true;
295 drop(done);
296 condvar.notify_all();
297 });
298
299 let (lock, condvar) = &*gate;
300 let mut done = lock.lock().unwrap_or_else(|poison| poison.into_inner());
301 while !*done {
302 if materializer.is_shutdown() {
303 return Err(StreamError::AbruptTermination);
304 }
305 if current_stream_cancelled()
306 .as_ref()
307 .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
308 {
309 return Err(StreamError::Cancelled);
310 }
311 let (next, _) = condvar
312 .wait_timeout(done, WAIT_POLL_INTERVAL)
313 .unwrap_or_else(|poison| poison.into_inner());
314 done = next;
315 }
316 Ok(())
317}
318
319#[derive(Debug)]
320struct TokenBucket {
321 available: f64,
322 last: Instant,
323 capacity: f64,
324 nanos_between_tokens: f64,
325}
326
327impl TokenBucket {
328 fn new(cost: u64, per: Duration, maximum_burst: i32) -> Self {
329 assert!(cost > 0, "throttle cost must be greater than zero");
330 assert!(
331 per > Duration::ZERO,
332 "throttle period must be greater than zero"
333 );
334 assert!(
335 per.as_nanos() >= u128::from(cost),
336 "Rates larger than 1 unit / nanosecond are not supported"
337 );
338 assert!(maximum_burst >= -1, "maximum_burst must be -1 or greater");
339 let nanos_between_tokens = per.as_nanos() as f64 / cost as f64;
340 let capacity = if maximum_burst == -1 {
341 let automatic = ((100_000_000_f64 / nanos_between_tokens).max(1.0)).floor();
342 automatic.max(1.0)
343 } else {
344 maximum_burst as f64
345 };
346 Self {
347 available: capacity,
348 last: Instant::now(),
349 capacity,
350 nanos_between_tokens,
351 }
352 }
353
354 fn offer(&mut self, cost: u64) -> Duration {
355 let now = Instant::now();
356 if now > self.last {
357 let elapsed = now.duration_since(self.last).as_nanos() as f64;
358 let replenished = elapsed / self.nanos_between_tokens;
359 self.available = (self.available + replenished).min(self.capacity);
360 self.last = now;
361 }
362
363 let cost = cost as f64;
364 if self.available >= cost {
365 self.available -= cost;
366 Duration::ZERO
367 } else {
368 let needed = cost - self.available;
369 self.available = 0.0;
370 let delay_nanos = (needed * self.nanos_between_tokens).ceil() as u64;
371 self.last = now + Duration::from_nanos(delay_nanos);
372 Duration::from_nanos(delay_nanos)
373 }
374 }
375}
376
377fn schedule_notify<T>(
378 materializer: &Materializer,
379 shared: Arc<DelayQueueShared<T>>,
380 delay: Duration,
381) where
382 T: Send + 'static,
383{
384 if delay.is_zero() {
385 shared.available.notify_all();
386 return;
387 }
388 let _timer = materializer.schedule_once(delay, move || {
389 shared.available.notify_all();
390 });
391}
392
393fn delay_stage<Out, Supplier, Strategy>(
394 input: BoxStream<Out>,
395 delay_strategy_supplier: Arc<Supplier>,
396 overflow_strategy: DelayOverflowStrategy,
397 materializer: &Materializer,
398) -> StreamResult<BoxStream<Out>>
399where
400 Out: Send + 'static,
401 Supplier: Fn() -> Strategy + Send + Sync + 'static,
402 Strategy: FnMut(&Out) -> Duration + Send + 'static,
403{
404 let shared = DelayQueueShared::new();
405 let producer_shared = Arc::clone(&shared);
406 let cancelled = Arc::clone(&shared.cancelled);
407 let state = Arc::clone(&materializer.inner.state);
408 let materializer = cloned_materializer(materializer);
409 let task_materializer = materializer.clone();
410 let completion = materializer.spawn_stream(move |_| {
411 let mut panic_guard = QueuePanicGuard::new(Arc::clone(&producer_shared));
412 let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
413 let mut delay_strategy = delay_strategy_supplier();
414
415 loop {
416 if cancelled.load(Ordering::SeqCst) {
417 panic_guard.disarm();
418 return Ok(NotUsed);
419 }
420
421 match input.next() {
422 Some(Ok(item)) => {
423 let delay = match catch_unwind(AssertUnwindSafe(|| delay_strategy(&item))) {
424 Ok(delay) => delay,
425 Err(_) => {
426 panic_guard.disarm();
427 finish_delay_queue(
428 &producer_shared,
429 TerminalSignal::Error(StreamError::AbruptTermination),
430 );
431 return Ok(NotUsed);
432 }
433 };
434
435 let deadline = Instant::now() + delay;
436 let mut state = producer_shared
437 .state
438 .lock()
439 .unwrap_or_else(|poison| poison.into_inner());
440
441 match overflow_strategy {
442 DelayOverflowStrategy::Backpressure => {
443 while state.queue.len() == DELAY_BUFFER_CAPACITY
444 && !cancelled.load(Ordering::SeqCst)
445 {
446 state = producer_shared
447 .available
448 .wait(state)
449 .unwrap_or_else(|poison| poison.into_inner());
450 }
451 if cancelled.load(Ordering::SeqCst) {
452 panic_guard.disarm();
453 return Ok(NotUsed);
454 }
455 let was_empty = state.queue.is_empty();
456 state.queue.push_back((deadline, item));
457 drop(state);
458 if was_empty {
459 schedule_notify(
460 &task_materializer,
461 Arc::clone(&producer_shared),
462 delay,
463 );
464 }
465 producer_shared.available.notify_all();
466 }
467 DelayOverflowStrategy::DropHead => {
468 if state.queue.len() == DELAY_BUFFER_CAPACITY {
469 let _ = state.queue.pop_front();
470 }
471 let was_empty = state.queue.is_empty();
472 state.queue.push_back((deadline, item));
473 drop(state);
474 if was_empty {
475 schedule_notify(
476 &task_materializer,
477 Arc::clone(&producer_shared),
478 delay,
479 );
480 }
481 producer_shared.available.notify_all();
482 }
483 DelayOverflowStrategy::DropTail => {
484 if state.queue.len() == DELAY_BUFFER_CAPACITY {
485 let _ = state.queue.pop_back();
486 }
487 let was_empty = state.queue.is_empty();
488 state.queue.push_back((deadline, item));
489 drop(state);
490 if was_empty {
491 schedule_notify(
492 &task_materializer,
493 Arc::clone(&producer_shared),
494 delay,
495 );
496 }
497 producer_shared.available.notify_all();
498 }
499 DelayOverflowStrategy::DropBuffer => {
500 if state.queue.len() == DELAY_BUFFER_CAPACITY {
501 state.queue.clear();
502 }
503 let was_empty = state.queue.is_empty();
504 state.queue.push_back((deadline, item));
505 drop(state);
506 if was_empty {
507 schedule_notify(
508 &task_materializer,
509 Arc::clone(&producer_shared),
510 delay,
511 );
512 }
513 producer_shared.available.notify_all();
514 }
515 DelayOverflowStrategy::DropNew => {
516 if state.queue.len() < DELAY_BUFFER_CAPACITY {
517 let was_empty = state.queue.is_empty();
518 state.queue.push_back((deadline, item));
519 drop(state);
520 if was_empty {
521 schedule_notify(
522 &task_materializer,
523 Arc::clone(&producer_shared),
524 delay,
525 );
526 }
527 producer_shared.available.notify_all();
528 }
529 }
530 DelayOverflowStrategy::Fail => {
531 if state.queue.len() == DELAY_BUFFER_CAPACITY {
532 state.queue.clear();
533 drop(state);
534 panic_guard.disarm();
535 finish_delay_queue(
536 &producer_shared,
537 TerminalSignal::Error(StreamError::Failed(format!(
538 "Buffer overflow for delay operator (max capacity was: {DELAY_BUFFER_CAPACITY})!"
539 ))),
540 );
541 return Ok(NotUsed);
542 }
543 let was_empty = state.queue.is_empty();
544 state.queue.push_back((deadline, item));
545 drop(state);
546 if was_empty {
547 schedule_notify(
548 &task_materializer,
549 Arc::clone(&producer_shared),
550 delay,
551 );
552 }
553 producer_shared.available.notify_all();
554 }
555 DelayOverflowStrategy::EmitEarly => {
556 if state.queue.len() == DELAY_BUFFER_CAPACITY {
557 if let Some((early_deadline, _)) = state.queue.front_mut() {
558 *early_deadline = Instant::now();
559 }
560 drop(state);
561 producer_shared.available.notify_all();
562 state = producer_shared
563 .state
564 .lock()
565 .unwrap_or_else(|poison| poison.into_inner());
566 while state.queue.len() == DELAY_BUFFER_CAPACITY
567 && !cancelled.load(Ordering::SeqCst)
568 {
569 state = producer_shared
570 .available
571 .wait(state)
572 .unwrap_or_else(|poison| poison.into_inner());
573 }
574 if cancelled.load(Ordering::SeqCst) {
575 panic_guard.disarm();
576 return Ok(NotUsed);
577 }
578 }
579 let was_empty = state.queue.is_empty();
580 state.queue.push_back((deadline, item));
581 drop(state);
582 if was_empty {
583 schedule_notify(
584 &task_materializer,
585 Arc::clone(&producer_shared),
586 delay,
587 );
588 }
589 producer_shared.available.notify_all();
590 }
591 }
592 }
593 Some(Err(error)) => {
594 panic_guard.disarm();
595 finish_delay_queue(&producer_shared, TerminalSignal::Error(error));
596 return Ok(NotUsed);
597 }
598 None => {
599 panic_guard.disarm();
600 finish_delay_queue(&producer_shared, TerminalSignal::Complete);
601 return Ok(NotUsed);
602 }
603 }
604 }
605 });
606
607 Ok(Box::new(DelayQueueStream {
608 shared,
609 completion: Some(completion),
610 }))
611}
612
613struct ThrottleStream<Out, CostFn> {
614 input: BoxStream<Out>,
615 materializer: Materializer,
616 token_bucket: TokenBucket,
617 cost_fn: Arc<CostFn>,
618 mode: ThrottleMode,
619 terminal: Option<TerminalSignal>,
620}
621
622impl<Out, CostFn> Iterator for ThrottleStream<Out, CostFn>
623where
624 Out: Send + 'static,
625 CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
626{
627 type Item = StreamResult<Out>;
628
629 fn next(&mut self) -> Option<Self::Item> {
630 if let Some(terminal) = self.terminal.clone() {
631 return match terminal {
632 TerminalSignal::Complete => None,
633 TerminalSignal::Error(error) => Some(Err(error)),
634 };
635 }
636
637 match self.input.next() {
638 Some(Ok(item)) => {
639 let cost = match catch_unwind(AssertUnwindSafe(|| (self.cost_fn)(&item))) {
640 Ok(cost) => cost,
641 Err(_) => {
642 self.terminal = Some(TerminalSignal::Error(StreamError::AbruptTermination));
643 return Some(Err(StreamError::AbruptTermination));
644 }
645 };
646 let delay = self.token_bucket.offer(cost);
647 if delay.is_zero() {
648 Some(Ok(item))
649 } else if self.mode == ThrottleMode::Enforcing {
650 let error =
651 StreamError::Failed("Maximum throttle throughput exceeded.".to_owned());
652 self.terminal = Some(TerminalSignal::Error(error.clone()));
653 Some(Err(error))
654 } else {
655 match wait_for_timer(&self.materializer, delay) {
656 Ok(()) => Some(Ok(item)),
657 Err(error) => {
658 self.terminal = Some(TerminalSignal::Error(error.clone()));
659 Some(Err(error))
660 }
661 }
662 }
663 }
664 Some(Err(error)) => {
665 self.terminal = Some(TerminalSignal::Error(error.clone()));
666 Some(Err(error))
667 }
668 None => {
669 self.terminal = Some(TerminalSignal::Complete);
670 None
671 }
672 }
673 }
674}
675
676struct GroupedShared<T> {
677 state: Mutex<GroupedState<T>>,
678 available: Condvar,
679 cancelled: Arc<AtomicBool>,
680}
681
682struct GroupedState<T> {
683 ready: VecDeque<Vec<T>>,
684 current: Vec<T>,
685 current_weight: u64,
686 generation: u64,
687 terminal: Option<TerminalSignal>,
688}
689
690impl<T> GroupedShared<T> {
691 fn new() -> Arc<Self> {
692 Arc::new(Self {
693 state: Mutex::new(GroupedState {
694 ready: VecDeque::new(),
695 current: Vec::new(),
696 current_weight: 0,
697 generation: 0,
698 terminal: None,
699 }),
700 available: Condvar::new(),
701 cancelled: Arc::new(AtomicBool::new(false)),
702 })
703 }
704}
705
706struct GroupedStream<T> {
707 shared: Arc<GroupedShared<T>>,
708 completion: Option<StreamCompletion<NotUsed>>,
709}
710
711impl<T> Iterator for GroupedStream<T> {
712 type Item = StreamResult<Vec<T>>;
713
714 fn next(&mut self) -> Option<Self::Item> {
715 let mut state = self
716 .shared
717 .state
718 .lock()
719 .unwrap_or_else(|poison| poison.into_inner());
720 loop {
721 if let Some(group) = state.ready.pop_front() {
722 drop(state);
723 self.shared.available.notify_all();
724 return Some(Ok(group));
725 }
726 if let Some(terminal) = state.terminal.clone() {
727 return match terminal {
728 TerminalSignal::Complete => None,
729 TerminalSignal::Error(error) => Some(Err(error)),
730 };
731 }
732 if self.shared.cancelled.load(Ordering::SeqCst) {
733 return Some(Err(StreamError::Cancelled));
734 }
735 let (next, _) = self
736 .shared
737 .available
738 .wait_timeout(state, WAIT_POLL_INTERVAL)
739 .unwrap_or_else(|poison| poison.into_inner());
740 state = next;
741 }
742 }
743}
744
745impl<T> Drop for GroupedStream<T> {
746 fn drop(&mut self) {
747 self.shared.cancelled.store(true, Ordering::SeqCst);
748 self.shared.available.notify_all();
749 let _ = self.completion.take();
750 }
751}
752
753fn finish_grouped<T>(shared: &GroupedShared<T>, terminal: TerminalSignal) {
754 let mut state = shared
755 .state
756 .lock()
757 .unwrap_or_else(|poison| poison.into_inner());
758 if state.terminal.is_none() {
759 if !state.current.is_empty() {
760 let current = mem::take(&mut state.current);
761 state.ready.push_back(current);
762 state.current_weight = 0;
763 state.generation = state.generation.wrapping_add(1);
764 }
765 state.terminal = Some(terminal);
766 }
767 drop(state);
768 shared.available.notify_all();
769}
770
771fn arm_grouped_timer<T: Send + 'static>(
772 materializer: &Materializer,
773 shared: Arc<GroupedShared<T>>,
774 interval: Duration,
775 generation: u64,
776) {
777 let _timer = materializer.schedule_once(interval, move || {
778 let mut state = shared
779 .state
780 .lock()
781 .unwrap_or_else(|poison| poison.into_inner());
782 if state.terminal.is_some() || state.generation != generation || state.current.is_empty() {
783 return;
784 }
785 let current = mem::take(&mut state.current);
786 state.ready.push_back(current);
787 state.current_weight = 0;
788 state.generation = state.generation.wrapping_add(1);
789 drop(state);
790 shared.available.notify_all();
791 });
792}
793
794fn grouped_weighted_within_stage<Out, CostFn>(
795 input: BoxStream<Out>,
796 max_weight: u64,
797 max_number: usize,
798 interval: Duration,
799 cost_fn: Arc<CostFn>,
800 materializer: &Materializer,
801) -> StreamResult<BoxStream<Vec<Out>>>
802where
803 Out: Send + 'static,
804 CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
805{
806 assert!(
807 max_weight > 0,
808 "grouped_weighted_within max_weight must be greater than zero"
809 );
810 assert!(
811 max_number > 0,
812 "grouped_weighted_within max_number must be greater than zero"
813 );
814 assert!(
815 interval > Duration::ZERO,
816 "grouped_weighted_within interval must be greater than zero"
817 );
818
819 let shared = GroupedShared::new();
820 let producer_shared = Arc::clone(&shared);
821 let cancelled = Arc::clone(&shared.cancelled);
822 let state = Arc::clone(&materializer.inner.state);
823 let materializer = cloned_materializer(materializer);
824 let task_materializer = materializer.clone();
825 let completion = materializer.spawn_stream(move |_| {
826 let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
827 loop {
828 if cancelled.load(Ordering::SeqCst) {
829 return Ok(NotUsed);
830 }
831
832 match input.next() {
833 Some(Ok(item)) => {
834 let weight = match catch_unwind(AssertUnwindSafe(|| (cost_fn)(&item))) {
835 Ok(weight) => weight,
836 Err(_) => {
837 finish_grouped(
838 &producer_shared,
839 TerminalSignal::Error(StreamError::AbruptTermination),
840 );
841 return Ok(NotUsed);
842 }
843 };
844
845 let mut state = producer_shared
846 .state
847 .lock()
848 .unwrap_or_else(|poison| poison.into_inner());
849 if state.current.is_empty() {
850 state.current.push(item);
851 state.current_weight = weight;
852 state.generation = state.generation.wrapping_add(1);
853 let generation = state.generation;
854 drop(state);
855 arm_grouped_timer(
856 &task_materializer,
857 Arc::clone(&producer_shared),
858 interval,
859 generation,
860 );
861 producer_shared.available.notify_all();
862 continue;
863 }
864
865 let fits = state.current_weight.saturating_add(weight) <= max_weight
866 && state.current.len() < max_number;
867 if fits {
868 state.current.push(item);
869 state.current_weight = state.current_weight.saturating_add(weight);
870 if state.current_weight >= max_weight || state.current.len() >= max_number {
871 let current = mem::take(&mut state.current);
872 state.ready.push_back(current);
873 state.current_weight = 0;
874 state.generation = state.generation.wrapping_add(1);
875 }
876 drop(state);
877 producer_shared.available.notify_all();
878 continue;
879 }
880
881 let current = mem::take(&mut state.current);
882 state.ready.push_back(current);
883 state.current_weight = 0;
884 state.generation = state.generation.wrapping_add(1);
885 let heavy_alone = weight > max_weight;
886 if heavy_alone {
887 state.ready.push_back(vec![item]);
888 } else {
889 state.current.push(item);
890 state.current_weight = weight;
891 state.generation = state.generation.wrapping_add(1);
892 let generation = state.generation;
893 drop(state);
894 arm_grouped_timer(
895 &task_materializer,
896 Arc::clone(&producer_shared),
897 interval,
898 generation,
899 );
900 producer_shared.available.notify_all();
901 continue;
902 }
903 drop(state);
904 producer_shared.available.notify_all();
905 }
906 Some(Err(error)) => {
907 finish_grouped(&producer_shared, TerminalSignal::Error(error));
908 return Ok(NotUsed);
909 }
910 None => {
911 finish_grouped(&producer_shared, TerminalSignal::Complete);
912 return Ok(NotUsed);
913 }
914 }
915 }
916 });
917
918 Ok(Box::new(GroupedStream {
919 shared,
920 completion: Some(completion),
921 }))
922}
923
924struct ForwardExtra {
925 generation: u64,
926}
927
928fn forward_slot_stage<Out, Setup, OnItem>(
929 input: BoxStream<Out>,
930 materializer: &Materializer,
931 setup: Setup,
932 on_item: OnItem,
933) -> StreamResult<BoxStream<Out>>
934where
935 Out: Send + 'static,
936 Setup:
937 FnOnce(Arc<SlotShared<Out, ForwardExtra>>, Arc<AtomicBool>, &Materializer) + Send + 'static,
938 OnItem: Fn(&Arc<SlotShared<Out, ForwardExtra>>, &Materializer, &Out) -> StreamResult<()>
939 + Send
940 + Sync
941 + 'static,
942{
943 let shared = SlotShared::new(ForwardExtra { generation: 0 });
944 let producer_shared = Arc::clone(&shared);
945 let cancelled = Arc::clone(&shared.cancelled);
946 let state = Arc::clone(&materializer.inner.state);
947 let materializer = cloned_materializer(materializer);
948 let task_materializer = materializer.clone();
949 setup(
950 Arc::clone(&producer_shared),
951 Arc::clone(&cancelled),
952 &materializer,
953 );
954 let on_item = Arc::new(on_item);
955 let completion = materializer.spawn_stream(move |_| {
956 let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
957 let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
958 loop {
959 if cancelled.load(Ordering::SeqCst) {
960 panic_guard.disarm();
961 return Ok(NotUsed);
962 }
963 match input.next() {
964 Some(Ok(item)) => {
965 on_item(&producer_shared, &task_materializer, &item)?;
966 let mut state = producer_shared
967 .state
968 .lock()
969 .unwrap_or_else(|poison| poison.into_inner());
970 while state.slot.is_some()
971 && state.terminal.is_none()
972 && !cancelled.load(Ordering::SeqCst)
973 {
974 state = producer_shared
975 .available
976 .wait(state)
977 .unwrap_or_else(|poison| poison.into_inner());
978 }
979 if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
980 panic_guard.disarm();
981 return Ok(NotUsed);
982 }
983 state.slot = Some(item);
984 state.extra.generation = state.extra.generation.wrapping_add(1);
985 drop(state);
986 producer_shared.available.notify_all();
987 }
988 Some(Err(error)) => {
989 panic_guard.disarm();
990 finish_slot(&producer_shared, TerminalSignal::Error(error));
991 return Ok(NotUsed);
992 }
993 None => {
994 panic_guard.disarm();
995 finish_slot(&producer_shared, TerminalSignal::Complete);
996 return Ok(NotUsed);
997 }
998 }
999 }
1000 });
1001
1002 Ok(Box::new(SlotStream {
1003 shared,
1004 completion: Some(completion),
1005 }))
1006}
1007
1008fn take_within_stage<Out: Send + 'static>(
1009 input: BoxStream<Out>,
1010 timeout: Duration,
1011 materializer: &Materializer,
1012) -> StreamResult<BoxStream<Out>> {
1013 assert!(
1014 timeout > Duration::ZERO,
1015 "take_within timeout must be greater than zero"
1016 );
1017 forward_slot_stage(
1018 input,
1019 materializer,
1020 move |shared, cancelled, materializer| {
1021 let _timer = materializer.schedule_once(timeout, move || {
1022 finish_slot(&shared, TerminalSignal::Complete);
1023 cancelled.store(true, Ordering::SeqCst);
1024 });
1025 },
1026 |_, _, _| Ok(()),
1027 )
1028}
1029
1030fn initial_delay_stage<Out: Send + 'static>(
1031 input: BoxStream<Out>,
1032 delay: Duration,
1033 materializer: &Materializer,
1034) -> StreamResult<BoxStream<Out>> {
1035 assert!(delay >= Duration::ZERO);
1036 let materializer = cloned_materializer(materializer);
1037 Ok(Box::new(InitialDelayStream {
1038 input,
1039 materializer,
1040 opened: delay.is_zero(),
1041 delay,
1042 terminal: None,
1043 }))
1044}
1045
1046struct InitialDelayStream<Out> {
1047 input: BoxStream<Out>,
1048 materializer: Materializer,
1049 opened: bool,
1050 delay: Duration,
1051 terminal: Option<TerminalSignal>,
1052}
1053
1054impl<Out: Send + 'static> Iterator for InitialDelayStream<Out> {
1055 type Item = StreamResult<Out>;
1056
1057 fn next(&mut self) -> Option<Self::Item> {
1058 if let Some(terminal) = self.terminal.clone() {
1059 return match terminal {
1060 TerminalSignal::Complete => None,
1061 TerminalSignal::Error(error) => Some(Err(error)),
1062 };
1063 }
1064 if !self.opened {
1065 if let Err(error) = wait_for_timer(&self.materializer, self.delay) {
1066 self.terminal = Some(TerminalSignal::Error(error.clone()));
1067 return Some(Err(error));
1068 }
1069 self.opened = true;
1070 }
1071 match self.input.next() {
1072 Some(Ok(item)) => Some(Ok(item)),
1073 Some(Err(error)) => {
1074 self.terminal = Some(TerminalSignal::Error(error.clone()));
1075 Some(Err(error))
1076 }
1077 None => {
1078 self.terminal = Some(TerminalSignal::Complete);
1079 None
1080 }
1081 }
1082 }
1083}
1084
1085fn arm_generation_failure<Out: Send + 'static>(
1086 materializer: &Materializer,
1087 shared: Arc<SlotShared<Out, ForwardExtra>>,
1088 timeout: Duration,
1089 message: &'static str,
1090 generation: u64,
1091 require_empty_slot: bool,
1092) {
1093 let _timer = materializer.schedule_once(timeout, move || {
1094 let should_fail = {
1095 let state = shared
1096 .state
1097 .lock()
1098 .unwrap_or_else(|poison| poison.into_inner());
1099 state.terminal.is_none()
1100 && state.extra.generation == generation
1101 && (!require_empty_slot || state.slot.is_none())
1102 };
1103 if should_fail {
1104 finish_slot(
1105 &shared,
1106 TerminalSignal::Error(StreamError::Failed(message.to_owned())),
1107 );
1108 shared.cancelled.store(true, Ordering::SeqCst);
1109 }
1110 });
1111}
1112
1113fn initial_timeout_stage<Out: Send + 'static>(
1114 input: BoxStream<Out>,
1115 timeout: Duration,
1116 materializer: &Materializer,
1117) -> StreamResult<BoxStream<Out>> {
1118 assert!(
1119 timeout > Duration::ZERO,
1120 "initial_timeout timeout must be greater than zero"
1121 );
1122 let shared = SlotShared::new(ForwardExtra { generation: 0 });
1123 let producer_shared = Arc::clone(&shared);
1124 let cancelled = Arc::clone(&shared.cancelled);
1125 let state = Arc::clone(&materializer.inner.state);
1126 let materializer = cloned_materializer(materializer);
1127 arm_generation_failure(
1128 &materializer,
1129 Arc::clone(&producer_shared),
1130 timeout,
1131 "The first element has not yet passed through before the initial timeout elapsed.",
1132 0,
1133 true,
1134 );
1135 let completion = materializer.spawn_stream(move |_| {
1136 let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
1137 let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
1138 loop {
1139 if cancelled.load(Ordering::SeqCst) {
1140 panic_guard.disarm();
1141 return Ok(NotUsed);
1142 }
1143 match input.next() {
1144 Some(Ok(item)) => {
1145 let mut state = producer_shared
1146 .state
1147 .lock()
1148 .unwrap_or_else(|poison| poison.into_inner());
1149 state.extra.generation = state.extra.generation.wrapping_add(1);
1150 while state.slot.is_some()
1151 && state.terminal.is_none()
1152 && !cancelled.load(Ordering::SeqCst)
1153 {
1154 state = producer_shared
1155 .available
1156 .wait(state)
1157 .unwrap_or_else(|poison| poison.into_inner());
1158 }
1159 if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
1160 panic_guard.disarm();
1161 return Ok(NotUsed);
1162 }
1163 state.slot = Some(item);
1164 drop(state);
1165 producer_shared.available.notify_all();
1166 }
1167 Some(Err(error)) => {
1168 panic_guard.disarm();
1169 finish_slot(&producer_shared, TerminalSignal::Error(error));
1170 return Ok(NotUsed);
1171 }
1172 None => {
1173 panic_guard.disarm();
1174 finish_slot(&producer_shared, TerminalSignal::Complete);
1175 return Ok(NotUsed);
1176 }
1177 }
1178 }
1179 });
1180
1181 Ok(Box::new(SlotStream {
1182 shared,
1183 completion: Some(completion),
1184 }))
1185}
1186
1187fn completion_timeout_stage<Out: Send + 'static>(
1188 input: BoxStream<Out>,
1189 timeout: Duration,
1190 materializer: &Materializer,
1191) -> StreamResult<BoxStream<Out>> {
1192 assert!(
1193 timeout > Duration::ZERO,
1194 "completion_timeout timeout must be greater than zero"
1195 );
1196 forward_slot_stage(
1197 input,
1198 materializer,
1199 move |shared, cancelled, materializer| {
1200 let _timer = materializer.schedule_once(timeout, move || {
1201 finish_slot(
1202 &shared,
1203 TerminalSignal::Error(StreamError::Failed(
1204 "The stream has not been completed before the completion timeout elapsed."
1205 .to_owned(),
1206 )),
1207 );
1208 cancelled.store(true, Ordering::SeqCst);
1209 });
1210 },
1211 |_, _, _| Ok(()),
1212 )
1213}
1214
1215fn idle_timeout_stage<Out: Send + 'static>(
1216 input: BoxStream<Out>,
1217 timeout: Duration,
1218 materializer: &Materializer,
1219) -> StreamResult<BoxStream<Out>> {
1220 assert!(
1221 timeout > Duration::ZERO,
1222 "idle_timeout timeout must be greater than zero"
1223 );
1224 let shared = SlotShared::new(ForwardExtra { generation: 0 });
1225 let producer_shared = Arc::clone(&shared);
1226 let cancelled = Arc::clone(&shared.cancelled);
1227 let state = Arc::clone(&materializer.inner.state);
1228 let materializer = cloned_materializer(materializer);
1229 let task_materializer = materializer.clone();
1230 arm_generation_failure(
1231 &materializer,
1232 Arc::clone(&producer_shared),
1233 timeout,
1234 "No elements passed before the idle timeout elapsed.",
1235 0,
1236 false,
1237 );
1238 let completion = materializer.spawn_stream(move |_| {
1239 let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
1240 let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
1241 loop {
1242 if cancelled.load(Ordering::SeqCst) {
1243 panic_guard.disarm();
1244 return Ok(NotUsed);
1245 }
1246 match input.next() {
1247 Some(Ok(item)) => {
1248 let mut state = producer_shared
1249 .state
1250 .lock()
1251 .unwrap_or_else(|poison| poison.into_inner());
1252 while state.slot.is_some()
1253 && state.terminal.is_none()
1254 && !cancelled.load(Ordering::SeqCst)
1255 {
1256 state = producer_shared
1257 .available
1258 .wait(state)
1259 .unwrap_or_else(|poison| poison.into_inner());
1260 }
1261 if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
1262 panic_guard.disarm();
1263 return Ok(NotUsed);
1264 }
1265 state.slot = Some(item);
1266 state.extra.generation = state.extra.generation.wrapping_add(1);
1267 let generation = state.extra.generation;
1268 drop(state);
1269 arm_generation_failure(
1270 &task_materializer,
1271 Arc::clone(&producer_shared),
1272 timeout,
1273 "No elements passed before the idle timeout elapsed.",
1274 generation,
1275 false,
1276 );
1277 producer_shared.available.notify_all();
1278 }
1279 Some(Err(error)) => {
1280 panic_guard.disarm();
1281 finish_slot(&producer_shared, TerminalSignal::Error(error));
1282 return Ok(NotUsed);
1283 }
1284 None => {
1285 panic_guard.disarm();
1286 finish_slot(&producer_shared, TerminalSignal::Complete);
1287 return Ok(NotUsed);
1288 }
1289 }
1290 }
1291 });
1292
1293 Ok(Box::new(SlotStream {
1294 shared,
1295 completion: Some(completion),
1296 }))
1297}
1298
1299fn backpressure_timeout_stage<Out: Send + 'static>(
1300 input: BoxStream<Out>,
1301 timeout: Duration,
1302 materializer: &Materializer,
1303) -> StreamResult<BoxStream<Out>> {
1304 assert!(
1305 timeout > Duration::ZERO,
1306 "backpressure_timeout timeout must be greater than zero"
1307 );
1308 let shared = SlotShared::new(ForwardExtra { generation: 0 });
1309 let producer_shared = Arc::clone(&shared);
1310 let cancelled = Arc::clone(&shared.cancelled);
1311 let state = Arc::clone(&materializer.inner.state);
1312 let materializer = cloned_materializer(materializer);
1313 let task_materializer = materializer.clone();
1314 let completion = materializer.spawn_stream(move |_| {
1315 let schedule_timeout = |generation: u64, shared: Arc<SlotShared<Out, ForwardExtra>>| {
1316 let _timer = task_materializer.schedule_once(timeout, move || {
1317 let mut state = shared
1318 .state
1319 .lock()
1320 .unwrap_or_else(|poison| poison.into_inner());
1321 if state.terminal.is_some() || state.extra.generation != generation {
1322 return;
1323 }
1324 state.slot = None;
1325 state.terminal = Some(TerminalSignal::Error(StreamError::Failed(
1326 "No downstream demand signalled before the backpressure timeout elapsed."
1327 .to_owned(),
1328 )));
1329 drop(state);
1330 shared.cancelled.store(true, Ordering::SeqCst);
1331 shared.available.notify_all();
1332 });
1333 };
1334 let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
1335 let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
1336 loop {
1337 if cancelled.load(Ordering::SeqCst) {
1338 panic_guard.disarm();
1339 return Ok(NotUsed);
1340 }
1341 match input.next() {
1342 Some(Ok(item)) => {
1343 let mut state = producer_shared
1344 .state
1345 .lock()
1346 .unwrap_or_else(|poison| poison.into_inner());
1347 while state.slot.is_some()
1348 && state.terminal.is_none()
1349 && !cancelled.load(Ordering::SeqCst)
1350 {
1351 state = producer_shared
1352 .available
1353 .wait(state)
1354 .unwrap_or_else(|poison| poison.into_inner());
1355 }
1356 if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
1357 panic_guard.disarm();
1358 return Ok(NotUsed);
1359 }
1360 state.slot = Some(item);
1361 state.extra.generation = state.extra.generation.wrapping_add(1);
1362 let generation = state.extra.generation;
1363 drop(state);
1364 schedule_timeout(generation, Arc::clone(&producer_shared));
1365 producer_shared.available.notify_all();
1366 }
1367 Some(Err(error)) => {
1368 panic_guard.disarm();
1369 finish_slot(&producer_shared, TerminalSignal::Error(error));
1370 return Ok(NotUsed);
1371 }
1372 None => {
1373 panic_guard.disarm();
1374 finish_slot(&producer_shared, TerminalSignal::Complete);
1375 return Ok(NotUsed);
1376 }
1377 }
1378 }
1379 });
1380
1381 struct BackpressureStream<Out> {
1382 shared: Arc<SlotShared<Out, ForwardExtra>>,
1383 completion: Option<StreamCompletion<NotUsed>>,
1384 }
1385
1386 impl<Out> Iterator for BackpressureStream<Out> {
1387 type Item = StreamResult<Out>;
1388
1389 fn next(&mut self) -> Option<Self::Item> {
1390 let mut state = self
1391 .shared
1392 .state
1393 .lock()
1394 .unwrap_or_else(|poison| poison.into_inner());
1395 loop {
1396 if let Some(item) = state.slot.take() {
1397 state.extra.generation = state.extra.generation.wrapping_add(1);
1398 drop(state);
1399 self.shared.available.notify_all();
1400 return Some(Ok(item));
1401 }
1402 if let Some(terminal) = state.terminal.clone() {
1403 return match terminal {
1404 TerminalSignal::Complete => None,
1405 TerminalSignal::Error(error) => Some(Err(error)),
1406 };
1407 }
1408 if self.shared.cancelled.load(Ordering::SeqCst) {
1409 return Some(Err(StreamError::Cancelled));
1410 }
1411 let (next, _) = self
1412 .shared
1413 .available
1414 .wait_timeout(state, WAIT_POLL_INTERVAL)
1415 .unwrap_or_else(|poison| poison.into_inner());
1416 state = next;
1417 }
1418 }
1419 }
1420
1421 impl<Out> Drop for BackpressureStream<Out> {
1422 fn drop(&mut self) {
1423 self.shared.cancelled.store(true, Ordering::SeqCst);
1424 self.shared.available.notify_all();
1425 let _ = self.completion.take();
1426 }
1427 }
1428
1429 Ok(Box::new(BackpressureStream {
1430 shared,
1431 completion: Some(completion),
1432 }))
1433}
1434
1435struct KeepAliveExtra {
1436 generation: u64,
1437}
1438
1439fn arm_keep_alive_timer<Out, Inject>(
1440 materializer: &Materializer,
1441 shared: Arc<SlotShared<Out, KeepAliveExtra>>,
1442 timeout: Duration,
1443 generation: u64,
1444 inject: Arc<Inject>,
1445) where
1446 Out: Send + 'static,
1447 Inject: Fn() -> Out + Send + Sync + 'static,
1448{
1449 let materializer = cloned_materializer(materializer);
1450 let task_materializer = materializer.clone();
1451 let _timer = materializer.schedule_once(timeout, move || {
1452 let slot_occupied = {
1453 let state = shared
1454 .state
1455 .lock()
1456 .unwrap_or_else(|poison| poison.into_inner());
1457 if state.terminal.is_some() || state.extra.generation != generation {
1458 return;
1459 }
1460 state.slot.is_some()
1461 };
1462 if slot_occupied {
1463 arm_keep_alive_timer(
1464 &task_materializer,
1465 Arc::clone(&shared),
1466 timeout,
1467 generation,
1468 Arc::clone(&inject),
1469 );
1470 return;
1471 }
1472
1473 let injected = match catch_unwind(AssertUnwindSafe(|| inject())) {
1474 Ok(item) => item,
1475 Err(_) => {
1476 finish_slot(
1477 &shared,
1478 TerminalSignal::Error(StreamError::AbruptTermination),
1479 );
1480 shared.cancelled.store(true, Ordering::SeqCst);
1481 return;
1482 }
1483 };
1484
1485 let mut state = shared
1486 .state
1487 .lock()
1488 .unwrap_or_else(|poison| poison.into_inner());
1489 if state.terminal.is_some() || state.extra.generation != generation || state.slot.is_some()
1490 {
1491 return;
1492 }
1493 state.slot = Some(injected);
1494 state.extra.generation = state.extra.generation.wrapping_add(1);
1495 let next_generation = state.extra.generation;
1496 drop(state);
1497 shared.available.notify_all();
1498 arm_keep_alive_timer(
1499 &task_materializer,
1500 Arc::clone(&shared),
1501 timeout,
1502 next_generation,
1503 Arc::clone(&inject),
1504 );
1505 });
1506}
1507
1508fn keep_alive_stage<Out, Inject>(
1509 input: BoxStream<Out>,
1510 timeout: Duration,
1511 inject: Arc<Inject>,
1512 materializer: &Materializer,
1513) -> StreamResult<BoxStream<Out>>
1514where
1515 Out: Send + 'static,
1516 Inject: Fn() -> Out + Send + Sync + 'static,
1517{
1518 assert!(
1519 timeout > Duration::ZERO,
1520 "keep_alive timeout must be greater than zero"
1521 );
1522 let shared = SlotShared::new(KeepAliveExtra { generation: 0 });
1523 let producer_shared = Arc::clone(&shared);
1524 let cancelled = Arc::clone(&shared.cancelled);
1525 let state = Arc::clone(&materializer.inner.state);
1526 let materializer = cloned_materializer(materializer);
1527 let task_materializer = materializer.clone();
1528 arm_keep_alive_timer(
1529 &materializer,
1530 Arc::clone(&producer_shared),
1531 timeout,
1532 0,
1533 Arc::clone(&inject),
1534 );
1535 let completion = materializer.spawn_stream(move |_| {
1536 let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
1537 let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
1538 loop {
1539 if cancelled.load(Ordering::SeqCst) {
1540 panic_guard.disarm();
1541 return Ok(NotUsed);
1542 }
1543 match input.next() {
1544 Some(Ok(item)) => {
1545 let mut state = producer_shared
1546 .state
1547 .lock()
1548 .unwrap_or_else(|poison| poison.into_inner());
1549 while state.slot.is_some()
1550 && state.terminal.is_none()
1551 && !cancelled.load(Ordering::SeqCst)
1552 {
1553 state = producer_shared
1554 .available
1555 .wait(state)
1556 .unwrap_or_else(|poison| poison.into_inner());
1557 }
1558 if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
1559 panic_guard.disarm();
1560 return Ok(NotUsed);
1561 }
1562 state.slot = Some(item);
1563 state.extra.generation = state.extra.generation.wrapping_add(1);
1564 let generation = state.extra.generation;
1565 drop(state);
1566 producer_shared.available.notify_all();
1567 arm_keep_alive_timer(
1568 &task_materializer,
1569 Arc::clone(&producer_shared),
1570 timeout,
1571 generation,
1572 Arc::clone(&inject),
1573 );
1574 }
1575 Some(Err(error)) => {
1576 panic_guard.disarm();
1577 finish_slot(&producer_shared, TerminalSignal::Error(error));
1578 return Ok(NotUsed);
1579 }
1580 None => {
1581 panic_guard.disarm();
1582 finish_slot(&producer_shared, TerminalSignal::Complete);
1583 return Ok(NotUsed);
1584 }
1585 }
1586 }
1587 });
1588
1589 Ok(Box::new(SlotStream {
1590 shared,
1591 completion: Some(completion),
1592 }))
1593}
1594
1595fn drop_within_stage<Out: Send + 'static>(
1596 input: BoxStream<Out>,
1597 timeout: Duration,
1598 materializer: &Materializer,
1599) -> StreamResult<BoxStream<Out>> {
1600 assert!(
1601 timeout > Duration::ZERO,
1602 "drop_within timeout must be greater than zero"
1603 );
1604 struct DropWithinExtra;
1605 let shared = SlotShared::new(DropWithinExtra);
1606 let producer_shared = Arc::clone(&shared);
1607 let cancelled = Arc::clone(&shared.cancelled);
1608 let state = Arc::clone(&materializer.inner.state);
1609 let materializer = cloned_materializer(materializer);
1610 let deadline = Instant::now() + timeout;
1611 let completion = materializer.spawn_stream(move |_| {
1612 let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
1613 let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
1614 loop {
1615 if cancelled.load(Ordering::SeqCst) {
1616 panic_guard.disarm();
1617 return Ok(NotUsed);
1618 }
1619 match input.next() {
1620 Some(Ok(item)) => {
1621 if Instant::now() < deadline {
1622 continue;
1623 }
1624 let mut state = producer_shared
1625 .state
1626 .lock()
1627 .unwrap_or_else(|poison| poison.into_inner());
1628 while state.slot.is_some()
1629 && state.terminal.is_none()
1630 && !cancelled.load(Ordering::SeqCst)
1631 {
1632 state = producer_shared
1633 .available
1634 .wait(state)
1635 .unwrap_or_else(|poison| poison.into_inner());
1636 }
1637 if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
1638 panic_guard.disarm();
1639 return Ok(NotUsed);
1640 }
1641 state.slot = Some(item);
1642 drop(state);
1643 producer_shared.available.notify_all();
1644 }
1645 Some(Err(error)) => {
1646 panic_guard.disarm();
1647 finish_slot(&producer_shared, TerminalSignal::Error(error));
1648 return Ok(NotUsed);
1649 }
1650 None => {
1651 panic_guard.disarm();
1652 finish_slot(&producer_shared, TerminalSignal::Complete);
1653 return Ok(NotUsed);
1654 }
1655 }
1656 }
1657 });
1658
1659 Ok(Box::new(SlotStream {
1660 shared,
1661 completion: Some(completion),
1662 }))
1663}
1664
1665impl<In: Send + 'static, Out: Send + 'static, Mat: Send + 'static> Flow<In, Out, Mat> {
1666 pub fn throttle(
1667 self,
1668 elements: u64,
1669 per: Duration,
1670 maximum_burst: i32,
1671 mode: ThrottleMode,
1672 ) -> Flow<In, Out, Mat> {
1673 self.throttle_with_cost(elements, per, maximum_burst, |_| 1, mode)
1674 }
1675
1676 pub fn throttle_with_cost<CostFn>(
1677 self,
1678 cost: u64,
1679 per: Duration,
1680 maximum_burst: i32,
1681 cost_fn: CostFn,
1682 mode: ThrottleMode,
1683 ) -> Flow<In, Out, Mat>
1684 where
1685 CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
1686 {
1687 let cost_fn = Arc::new(cost_fn);
1688 self.via(Flow::from_runtime_transform(move |input, materializer| {
1689 let materializer = cloned_materializer(materializer);
1690 Ok(Box::new(ThrottleStream {
1691 input,
1692 materializer,
1693 token_bucket: TokenBucket::new(cost, per, maximum_burst),
1694 cost_fn: Arc::clone(&cost_fn),
1695 mode,
1696 terminal: None,
1697 }))
1698 }))
1699 }
1700
1701 pub fn delay(self, delay: Duration, strategy: DelayOverflowStrategy) -> Flow<In, Out, Mat> {
1702 self.delay_with(move || move |_: &Out| delay, strategy)
1703 }
1704
1705 pub fn delay_with<Supplier, Strategy>(
1706 self,
1707 delay_strategy_supplier: Supplier,
1708 overflow_strategy: DelayOverflowStrategy,
1709 ) -> Flow<In, Out, Mat>
1710 where
1711 Supplier: Fn() -> Strategy + Send + Sync + 'static,
1712 Strategy: FnMut(&Out) -> Duration + Send + 'static,
1713 {
1714 let delay_strategy_supplier = Arc::new(delay_strategy_supplier);
1715 self.via(Flow::from_runtime_transform(move |input, materializer| {
1716 delay_stage(
1717 input,
1718 Arc::clone(&delay_strategy_supplier),
1719 overflow_strategy,
1720 materializer,
1721 )
1722 }))
1723 }
1724
1725 pub fn initial_delay(self, delay: Duration) -> Flow<In, Out, Mat> {
1726 self.via(Flow::from_runtime_transform(move |input, materializer| {
1727 initial_delay_stage(input, delay, materializer)
1728 }))
1729 }
1730
1731 pub fn grouped_within(self, max_number: usize, interval: Duration) -> Flow<In, Vec<Out>, Mat> {
1732 let unit_cost = Arc::new(|_: &Out| 1_u64);
1733 self.via(Flow::from_runtime_transform(move |input, materializer| {
1734 grouped_weighted_within_stage(
1735 input,
1736 max_number as u64,
1737 max_number,
1738 interval,
1739 Arc::clone(&unit_cost),
1740 materializer,
1741 )
1742 }))
1743 }
1744
1745 pub fn grouped_weighted_within<CostFn>(
1746 self,
1747 max_weight: u64,
1748 interval: Duration,
1749 cost_fn: CostFn,
1750 ) -> Flow<In, Vec<Out>, Mat>
1751 where
1752 CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
1753 {
1754 let cost_fn = Arc::new(cost_fn);
1755 self.via(Flow::from_runtime_transform(move |input, materializer| {
1756 grouped_weighted_within_stage(
1757 input,
1758 max_weight,
1759 usize::MAX,
1760 interval,
1761 Arc::clone(&cost_fn),
1762 materializer,
1763 )
1764 }))
1765 }
1766
1767 pub fn drop_within(self, timeout: Duration) -> Flow<In, Out, Mat> {
1768 self.via(Flow::from_runtime_transform(move |input, materializer| {
1769 drop_within_stage(input, timeout, materializer)
1770 }))
1771 }
1772
1773 pub fn take_within(self, timeout: Duration) -> Flow<In, Out, Mat> {
1774 self.via(Flow::from_runtime_transform(move |input, materializer| {
1775 take_within_stage(input, timeout, materializer)
1776 }))
1777 }
1778
1779 pub fn idle_timeout(self, timeout: Duration) -> Flow<In, Out, Mat> {
1780 self.via(Flow::from_runtime_transform(move |input, materializer| {
1781 idle_timeout_stage(input, timeout, materializer)
1782 }))
1783 }
1784
1785 pub fn backpressure_timeout(self, timeout: Duration) -> Flow<In, Out, Mat> {
1786 self.via(Flow::from_runtime_transform(move |input, materializer| {
1787 backpressure_timeout_stage(input, timeout, materializer)
1788 }))
1789 }
1790
1791 pub fn completion_timeout(self, timeout: Duration) -> Flow<In, Out, Mat> {
1792 self.via(Flow::from_runtime_transform(move |input, materializer| {
1793 completion_timeout_stage(input, timeout, materializer)
1794 }))
1795 }
1796
1797 pub fn initial_timeout(self, timeout: Duration) -> Flow<In, Out, Mat> {
1798 self.via(Flow::from_runtime_transform(move |input, materializer| {
1799 initial_timeout_stage(input, timeout, materializer)
1800 }))
1801 }
1802
1803 pub fn keep_alive<Inject>(self, timeout: Duration, inject: Inject) -> Flow<In, Out, Mat>
1804 where
1805 Inject: Fn() -> Out + Send + Sync + 'static,
1806 {
1807 let inject = Arc::new(inject);
1808 self.via(Flow::from_runtime_transform(move |input, materializer| {
1809 keep_alive_stage(input, timeout, Arc::clone(&inject), materializer)
1810 }))
1811 }
1812}
1813
1814impl<Out: Send + 'static, Mat: Send + 'static> Source<Out, Mat> {
1815 pub fn throttle(
1816 self,
1817 elements: u64,
1818 per: Duration,
1819 maximum_burst: i32,
1820 mode: ThrottleMode,
1821 ) -> Self {
1822 self.via(Flow::identity().throttle(elements, per, maximum_burst, mode))
1823 }
1824
1825 pub fn throttle_with_cost<CostFn>(
1826 self,
1827 cost: u64,
1828 per: Duration,
1829 maximum_burst: i32,
1830 cost_fn: CostFn,
1831 mode: ThrottleMode,
1832 ) -> Self
1833 where
1834 CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
1835 {
1836 self.via(Flow::identity().throttle_with_cost(cost, per, maximum_burst, cost_fn, mode))
1837 }
1838
1839 pub fn delay(self, delay: Duration, strategy: DelayOverflowStrategy) -> Self {
1840 self.via(Flow::identity().delay(delay, strategy))
1841 }
1842
1843 pub fn delay_with<Supplier, Strategy>(
1844 self,
1845 delay_strategy_supplier: Supplier,
1846 overflow_strategy: DelayOverflowStrategy,
1847 ) -> Self
1848 where
1849 Supplier: Fn() -> Strategy + Send + Sync + 'static,
1850 Strategy: FnMut(&Out) -> Duration + Send + 'static,
1851 {
1852 self.via(Flow::identity().delay_with(delay_strategy_supplier, overflow_strategy))
1853 }
1854
1855 pub fn initial_delay(self, delay: Duration) -> Self {
1856 self.via(Flow::identity().initial_delay(delay))
1857 }
1858
1859 pub fn grouped_within(self, max_number: usize, interval: Duration) -> Source<Vec<Out>, Mat> {
1860 self.via(Flow::identity().grouped_within(max_number, interval))
1861 }
1862
1863 pub fn grouped_weighted_within<CostFn>(
1864 self,
1865 max_weight: u64,
1866 interval: Duration,
1867 cost_fn: CostFn,
1868 ) -> Source<Vec<Out>, Mat>
1869 where
1870 CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
1871 {
1872 self.via(Flow::identity().grouped_weighted_within(max_weight, interval, cost_fn))
1873 }
1874
1875 pub fn drop_within(self, timeout: Duration) -> Self {
1876 self.via(Flow::identity().drop_within(timeout))
1877 }
1878
1879 pub fn take_within(self, timeout: Duration) -> Self {
1880 self.via(Flow::identity().take_within(timeout))
1881 }
1882
1883 pub fn idle_timeout(self, timeout: Duration) -> Self {
1884 self.via(Flow::identity().idle_timeout(timeout))
1885 }
1886
1887 pub fn backpressure_timeout(self, timeout: Duration) -> Self {
1888 self.via(Flow::identity().backpressure_timeout(timeout))
1889 }
1890
1891 pub fn completion_timeout(self, timeout: Duration) -> Self {
1892 self.via(Flow::identity().completion_timeout(timeout))
1893 }
1894
1895 pub fn initial_timeout(self, timeout: Duration) -> Self {
1896 self.via(Flow::identity().initial_timeout(timeout))
1897 }
1898
1899 pub fn keep_alive<Inject>(self, timeout: Duration, inject: Inject) -> Self
1900 where
1901 Inject: Fn() -> Out + Send + Sync + 'static,
1902 {
1903 self.via(Flow::identity().keep_alive(timeout, inject))
1904 }
1905}
1906
1907impl<Out: Clone + Send + Sync + 'static> Source<Out, Cancellable> {
1908 pub fn tick(initial_delay: Duration, interval: Duration, element: Out) -> Self {
1909 assert!(
1910 interval > Duration::ZERO,
1911 "tick interval must be greater than zero"
1912 );
1913 Source::from_materialized_factory(move |materializer| {
1914 struct TickState {
1915 pending: bool,
1916 closed: bool,
1917 }
1918
1919 let shared = Arc::new((
1920 Mutex::new(TickState {
1921 pending: false,
1922 closed: false,
1923 }),
1924 Condvar::new(),
1925 ));
1926 let keep_alive: Arc<dyn Send + Sync> =
1927 Arc::clone(&materializer.inner) as Arc<dyn Send + Sync>;
1928 let cancellable = Cancellable::new_with_keep_alive(Some(keep_alive));
1929 let cancelled = Arc::clone(&cancellable.cancelled);
1930 let shutdown = Arc::clone(&materializer.inner.state.shutdown);
1931 let shared_task = Arc::clone(&shared);
1932 let timer =
1933 materializer.schedule_with_fixed_delay(initial_delay, interval, move || {
1934 if cancelled.load(Ordering::SeqCst) || shutdown.load(Ordering::SeqCst) {
1935 return;
1936 }
1937 let (lock, condvar) = &*shared_task;
1938 let mut state = lock.lock().unwrap_or_else(|poison| poison.into_inner());
1939 if !state.closed {
1940 state.pending = true;
1941 }
1942 drop(state);
1943 condvar.notify_all();
1944 });
1945
1946 struct TickStream<Out> {
1947 shared: Arc<(Mutex<TickState>, Condvar)>,
1948 timer: Cancellable,
1949 external: Cancellable,
1950 element: Out,
1951 shutdown: Arc<AtomicBool>,
1952 }
1953
1954 impl<Out: Clone> Iterator for TickStream<Out> {
1955 type Item = StreamResult<Out>;
1956
1957 fn next(&mut self) -> Option<Self::Item> {
1958 let (lock, condvar) = &*self.shared;
1959 let mut state = lock.lock().unwrap_or_else(|poison| poison.into_inner());
1960 loop {
1961 if self.external.is_cancelled() || self.shutdown.load(Ordering::SeqCst) {
1962 state.closed = true;
1963 self.timer.cancel();
1964 return None;
1965 }
1966 if state.pending {
1967 state.pending = false;
1968 return Some(Ok(self.element.clone()));
1969 }
1970 let (next, _) = condvar
1971 .wait_timeout(state, WAIT_POLL_INTERVAL)
1972 .unwrap_or_else(|poison| poison.into_inner());
1973 state = next;
1974 }
1975 }
1976 }
1977
1978 impl<Out> Drop for TickStream<Out> {
1979 fn drop(&mut self) {
1980 let (lock, condvar) = &*self.shared;
1981 let mut state = lock.lock().unwrap_or_else(|poison| poison.into_inner());
1982 state.closed = true;
1983 drop(state);
1984 self.timer.cancel();
1985 condvar.notify_all();
1986 }
1987 }
1988
1989 Ok((
1990 Box::new(TickStream {
1991 shared,
1992 timer,
1993 external: cancellable.clone(),
1994 element: element.clone(),
1995 shutdown: Arc::clone(&materializer.inner.state.shutdown),
1996 }) as BoxStream<Out>,
1997 cancellable,
1998 ))
1999 })
2000 }
2001}
2002
2003#[cfg(test)]
2004mod tests {
2005 use super::*;
2006 use crate::testkit::{TestSink, TestSource};
2007 use std::sync::mpsc;
2008 use std::thread;
2009
2010 const LOAD_TIMEOUT: Duration = Duration::from_millis(100);
2011 const LOAD_GAP: Duration = Duration::from_millis(250);
2012
2013 fn materialize_stream<T: Send + 'static, Mat: Send + 'static>(
2014 source: Source<T, Mat>,
2015 ) -> (BoxStream<T>, Materializer, Mat) {
2016 let materializer = Materializer::new();
2017 let (stream, mat) = Arc::clone(&source.factory)
2018 .create(&materializer)
2019 .expect("stream materializes");
2020 (stream, materializer, mat)
2021 }
2022
2023 #[test]
2024 fn throttle_shaping_spaces_elements() {
2025 let (tx, rx) = mpsc::channel();
2026 Source::from_iter(1..=3)
2027 .map(move |_| {
2028 tx.send(Instant::now()).unwrap();
2029 })
2030 .throttle(1, Duration::from_millis(40), 0, ThrottleMode::Shaping)
2031 .run_with(Sink::ignore())
2032 .unwrap()
2033 .wait()
2034 .unwrap();
2035
2036 let first = rx.recv_timeout(Duration::from_millis(250)).unwrap();
2037 let second = rx.recv_timeout(Duration::from_millis(250)).unwrap();
2038 let third = rx.recv_timeout(Duration::from_millis(250)).unwrap();
2039 assert!(second.duration_since(first) >= Duration::from_millis(20));
2040 assert!(third.duration_since(second) >= Duration::from_millis(20));
2041 }
2042
2043 #[test]
2044 fn throttle_enforcing_fails_when_rate_exceeded() {
2045 let result = Source::from_iter(1..=3)
2046 .throttle(1, Duration::from_millis(80), 1, ThrottleMode::Enforcing)
2047 .run_collect();
2048 assert_eq!(
2049 result,
2050 Err(StreamError::Failed(
2051 "Maximum throttle throughput exceeded.".to_owned()
2052 ))
2053 );
2054 }
2055
2056 #[test]
2057 fn delay_delays_first_element_without_reordering() {
2058 let start = Instant::now();
2059 let items = Source::from_iter([1, 2])
2060 .delay(
2061 Duration::from_millis(40),
2062 DelayOverflowStrategy::Backpressure,
2063 )
2064 .run_collect()
2065 .unwrap();
2066 assert_eq!(items, vec![1, 2]);
2067 assert!(start.elapsed() >= Duration::from_millis(20));
2068 }
2069
2070 #[test]
2071 fn delay_emit_early_preserves_elements_when_buffer_not_full() {
2072 let start = Instant::now();
2073 let items = Source::from_iter([1, 2])
2074 .delay(Duration::from_millis(40), DelayOverflowStrategy::EmitEarly)
2075 .run_collect()
2076 .unwrap();
2077 assert_eq!(items, vec![1, 2]);
2078 assert!(start.elapsed() >= Duration::from_millis(20));
2079 }
2080
2081 #[test]
2082 fn delay_emit_early_flushes_oldest_when_buffer_full() {
2083 let delay = Duration::from_secs(2);
2084 let (mut stream, materializer, _mat) = materialize_stream(
2085 Source::from_iter(1..=17).delay(delay, DelayOverflowStrategy::EmitEarly),
2086 );
2087
2088 let start = Instant::now();
2089 assert_eq!(stream.next(), Some(Ok(1)));
2090 assert!(
2091 start.elapsed() < Duration::from_millis(750),
2092 "EmitEarly should flush well before the configured delay under load"
2093 );
2094
2095 let mut items = vec![1];
2096 for item in stream {
2097 items.push(item.unwrap());
2098 }
2099 assert_eq!(items, (1..=17).collect::<Vec<_>>());
2100 materializer.shutdown();
2101 }
2102
2103 #[test]
2104 fn initial_delay_holds_back_first_pull() {
2105 let start = Instant::now();
2106 let result = Source::single(42)
2107 .initial_delay(Duration::from_millis(40))
2108 .run_with(Sink::head())
2109 .unwrap();
2110 assert_eq!(result.wait().unwrap(), 42);
2111 assert!(start.elapsed() >= Duration::from_millis(20));
2112 }
2113
2114 #[test]
2115 fn grouped_within_flushes_on_timer() {
2116 let (publisher, subscriber) = TestSource::probe::<i32>()
2117 .grouped_within(8, Duration::from_millis(40))
2118 .to_mat(TestSink::probe(), Keep::both)
2119 .run()
2120 .unwrap();
2121
2122 publisher.expect_request();
2123 publisher.send_next(1);
2124 publisher.expect_request();
2125 publisher.send_next(2);
2126
2127 subscriber.request(1);
2128 assert_eq!(subscriber.expect_next(), vec![1, 2]);
2129 }
2130
2131 #[test]
2132 fn grouped_weighted_within_flushes_on_weight() {
2133 let result = Source::from_iter(["aa", "bbb", "c"])
2134 .grouped_weighted_within(4, Duration::from_secs(1), |s| s.len() as u64)
2135 .run_collect()
2136 .unwrap();
2137 assert_eq!(result, vec![vec!["aa"], vec!["bbb", "c"]]);
2138 }
2139
2140 #[test]
2141 fn drop_within_drops_early_elements() {
2142 let (publisher, subscriber) = TestSource::probe::<i32>()
2143 .drop_within(Duration::from_millis(40))
2144 .to_mat(TestSink::probe(), Keep::both)
2145 .run()
2146 .unwrap();
2147
2148 publisher.expect_request();
2149 publisher.send_next(1);
2150 thread::sleep(Duration::from_millis(120));
2151 publisher.expect_request();
2152 publisher.send_next(2);
2153 publisher.send_complete();
2154
2155 subscriber.request(2);
2156 assert_eq!(subscriber.expect_next(), 2);
2157 subscriber.expect_complete();
2158 }
2159
2160 #[test]
2161 fn take_within_completes_after_timeout() {
2162 let (publisher, subscriber) = TestSource::probe::<i32>()
2163 .take_within(Duration::from_millis(40))
2164 .to_mat(TestSink::probe(), Keep::both)
2165 .run()
2166 .unwrap();
2167
2168 publisher.expect_request();
2169 publisher.send_next(1);
2170 subscriber.request(2);
2171 assert_eq!(subscriber.expect_next(), 1);
2172 subscriber.expect_complete();
2173 }
2174
2175 #[test]
2176 fn idle_timeout_fails_on_gap() {
2177 let (publisher, subscriber) = TestSource::probe::<i32>()
2178 .idle_timeout(Duration::from_millis(40))
2179 .to_mat(TestSink::probe(), Keep::both)
2180 .run()
2181 .unwrap();
2182
2183 publisher.expect_request();
2184 publisher.send_next(1);
2185 subscriber.request(2);
2186 assert_eq!(subscriber.expect_next(), 1);
2187 assert!(matches!(subscriber.expect_error(), StreamError::Failed(_)));
2188 }
2189
2190 #[test]
2191 fn backpressure_timeout_fails_without_demand() {
2192 let (publisher, subscriber) = TestSource::probe::<i32>()
2193 .backpressure_timeout(LOAD_TIMEOUT)
2194 .to_mat(TestSink::probe(), Keep::both)
2195 .run()
2196 .unwrap();
2197
2198 subscriber.request(1);
2199 publisher.expect_request();
2200 publisher.send_next(1);
2201 assert_eq!(subscriber.expect_next(), 1);
2202 publisher.expect_request();
2203 publisher.send_next(2);
2204 thread::sleep(LOAD_GAP);
2205 subscriber.request(1);
2206 assert!(matches!(subscriber.expect_error(), StreamError::Failed(_)));
2207 }
2208
2209 #[test]
2210 fn completion_timeout_fails_unfinished_stream() {
2211 let result = Source::<i32>::never()
2212 .completion_timeout(Duration::from_millis(40))
2213 .run_collect();
2214 assert!(matches!(result, Err(StreamError::Failed(_))));
2215 }
2216
2217 #[test]
2218 fn initial_timeout_fails_before_first_element() {
2219 let (mut stream, materializer, _mat) =
2220 materialize_stream(Source::<i32>::never().initial_timeout(Duration::from_millis(40)));
2221 assert!(matches!(stream.next(), Some(Err(StreamError::Failed(_)))));
2222 materializer.shutdown();
2223 }
2224
2225 #[test]
2226 fn keep_alive_injects_on_idle_gap() {
2227 let (publisher, subscriber) = TestSource::probe::<i32>()
2228 .keep_alive(LOAD_TIMEOUT, || 0)
2229 .to_mat(TestSink::probe(), Keep::both)
2230 .run()
2231 .unwrap();
2232
2233 subscriber.request(3);
2234 publisher.expect_request();
2235 publisher.send_next(1);
2236 assert_eq!(subscriber.expect_next(), 1);
2237 assert_eq!(subscriber.expect_next(), 0);
2238 }
2239
2240 #[test]
2241 fn keep_alive_rearms_after_slow_consumer_drains_slot() {
2242 let (publisher, subscriber) = TestSource::probe::<i32>()
2243 .keep_alive(LOAD_TIMEOUT, || 0)
2244 .to_mat(TestSink::probe(), Keep::both)
2245 .run()
2246 .unwrap();
2247
2248 subscriber.request(1);
2249 publisher.expect_request();
2250 publisher.send_next(1);
2251 assert_eq!(subscriber.expect_next(), 1);
2252
2253 publisher.expect_request();
2254 publisher.send_next(2);
2255 thread::sleep(LOAD_GAP);
2256
2257 subscriber.request(1);
2258 assert_eq!(subscriber.expect_next(), 2);
2259
2260 thread::sleep(LOAD_GAP);
2261 subscriber.request(1);
2262 assert_eq!(subscriber.expect_next(), 0);
2263 }
2264
2265 #[test]
2266 #[should_panic(expected = "maximum_burst must be -1 or greater")]
2267 fn throttle_rejects_invalid_negative_maximum_burst() {
2268 let _ = Source::single(1)
2269 .throttle(1, Duration::from_millis(40), -2, ThrottleMode::Shaping)
2270 .run_collect();
2271 }
2272
2273 #[test]
2274 fn tick_drops_missed_ticks_and_cancels() {
2275 let (mut stream, materializer, _cancellable) = materialize_stream(Source::tick(
2276 Duration::from_millis(20),
2277 Duration::from_millis(20),
2278 7_i32,
2279 ));
2280
2281 thread::sleep(Duration::from_millis(100));
2282 assert_eq!(stream.next(), Some(Ok(7)));
2283 assert_eq!(stream.next(), Some(Ok(7)));
2284 materializer.shutdown();
2285 assert_eq!(stream.next(), None);
2286 }
2287}