1use super::*;
2use crate::stream::runtime::current_stream_cancelled;
3use std::collections::VecDeque;
4use std::mem;
5use std::panic::{AssertUnwindSafe, catch_unwind};
6use std::time::Instant;
7
8const DELAY_BUFFER_CAPACITY: usize = 16;
9const WAIT_POLL_INTERVAL: Duration = Duration::from_millis(10);
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum ThrottleMode {
13 Shaping,
14 Enforcing,
15}
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum DelayOverflowStrategy {
19 EmitEarly,
20 DropHead,
21 DropTail,
22 DropBuffer,
23 DropNew,
24 Backpressure,
25 Fail,
26}
27
28#[derive(Clone)]
29enum TerminalSignal {
30 Complete,
31 Error(StreamError),
32}
33
34struct DelayQueueShared<T> {
35 state: Mutex<DelayQueueState<T>>,
36 available: Condvar,
37 cancelled: Arc<AtomicBool>,
38}
39
40struct DelayQueueState<T> {
41 queue: VecDeque<(Instant, T)>,
42 terminal: Option<TerminalSignal>,
43}
44
45impl<T> DelayQueueShared<T> {
46 fn new() -> Arc<Self> {
47 Arc::new(Self {
48 state: Mutex::new(DelayQueueState {
49 queue: VecDeque::new(),
50 terminal: None,
51 }),
52 available: Condvar::new(),
53 cancelled: Arc::new(AtomicBool::new(false)),
54 })
55 }
56}
57
58struct DelayQueueStream<T> {
59 shared: Arc<DelayQueueShared<T>>,
60 completion: Option<StreamCompletion<NotUsed>>,
61}
62
63impl<T> Iterator for DelayQueueStream<T> {
64 type Item = StreamResult<T>;
65
66 fn next(&mut self) -> Option<Self::Item> {
67 let mut state = self
68 .shared
69 .state
70 .lock()
71 .unwrap_or_else(|poison| poison.into_inner());
72 loop {
73 if let Some((deadline, _)) = state.queue.front() {
74 let now = Instant::now();
75 if *deadline <= now {
76 let (_, item) = state.queue.pop_front().expect("front element present");
77 drop(state);
78 self.shared.available.notify_all();
79 return Some(Ok(item));
80 }
81 let wait = (*deadline - now).min(WAIT_POLL_INTERVAL);
82 let (next, _) = self
83 .shared
84 .available
85 .wait_timeout(state, wait)
86 .unwrap_or_else(|poison| poison.into_inner());
87 state = next;
88 continue;
89 }
90
91 if let Some(terminal) = state.terminal.clone() {
92 return match terminal {
93 TerminalSignal::Complete => None,
94 TerminalSignal::Error(error) => Some(Err(error)),
95 };
96 }
97 if self.shared.cancelled.load(Ordering::SeqCst) {
98 return Some(Err(StreamError::Cancelled));
99 }
100
101 let (next, _) = self
102 .shared
103 .available
104 .wait_timeout(state, WAIT_POLL_INTERVAL)
105 .unwrap_or_else(|poison| poison.into_inner());
106 state = next;
107 }
108 }
109}
110
111impl<T> Drop for DelayQueueStream<T> {
112 fn drop(&mut self) {
113 self.shared.cancelled.store(true, Ordering::SeqCst);
114 self.shared.available.notify_all();
115 let _ = self.completion.take();
116 }
117}
118
119struct SlotShared<T, Extra> {
120 state: Mutex<SlotState<T, Extra>>,
121 available: Condvar,
122 cancelled: Arc<AtomicBool>,
123}
124
125struct SlotState<T, Extra> {
126 slot: Option<T>,
127 terminal: Option<TerminalSignal>,
128 extra: Extra,
129}
130
131impl<T, Extra> SlotShared<T, Extra> {
132 fn new(extra: Extra) -> Arc<Self> {
133 Arc::new(Self {
134 state: Mutex::new(SlotState {
135 slot: None,
136 terminal: None,
137 extra,
138 }),
139 available: Condvar::new(),
140 cancelled: Arc::new(AtomicBool::new(false)),
141 })
142 }
143}
144
145struct SlotStream<T, Extra> {
146 shared: Arc<SlotShared<T, Extra>>,
147 completion: Option<StreamCompletion<NotUsed>>,
148}
149
150impl<T, Extra> Iterator for SlotStream<T, Extra> {
151 type Item = StreamResult<T>;
152
153 fn next(&mut self) -> Option<Self::Item> {
154 let mut state = self
155 .shared
156 .state
157 .lock()
158 .unwrap_or_else(|poison| poison.into_inner());
159 loop {
160 if let Some(item) = state.slot.take() {
161 drop(state);
162 self.shared.available.notify_all();
163 return Some(Ok(item));
164 }
165 if let Some(terminal) = state.terminal.clone() {
166 return match terminal {
167 TerminalSignal::Complete => None,
168 TerminalSignal::Error(error) => Some(Err(error)),
169 };
170 }
171 if self.shared.cancelled.load(Ordering::SeqCst) {
172 return Some(Err(StreamError::Cancelled));
173 }
174 let (next, _) = self
175 .shared
176 .available
177 .wait_timeout(state, WAIT_POLL_INTERVAL)
178 .unwrap_or_else(|poison| poison.into_inner());
179 state = next;
180 }
181 }
182}
183
184impl<T, Extra> Drop for SlotStream<T, Extra> {
185 fn drop(&mut self) {
186 self.shared.cancelled.store(true, Ordering::SeqCst);
187 self.shared.available.notify_all();
188 let _ = self.completion.take();
189 }
190}
191
192fn finish_delay_queue<T>(shared: &DelayQueueShared<T>, terminal: TerminalSignal) {
193 let mut state = shared
194 .state
195 .lock()
196 .unwrap_or_else(|poison| poison.into_inner());
197 if state.terminal.is_none() {
198 state.terminal = Some(terminal);
199 }
200 drop(state);
201 shared.available.notify_all();
202}
203
204fn finish_slot<T, Extra>(shared: &SlotShared<T, Extra>, terminal: TerminalSignal) {
205 let mut state = shared
206 .state
207 .lock()
208 .unwrap_or_else(|poison| poison.into_inner());
209 if state.terminal.is_none() {
210 state.terminal = Some(terminal);
211 }
212 drop(state);
213 shared.available.notify_all();
214}
215
216struct QueuePanicGuard<T> {
217 shared: Arc<DelayQueueShared<T>>,
218 armed: bool,
219}
220
221impl<T> QueuePanicGuard<T> {
222 fn new(shared: Arc<DelayQueueShared<T>>) -> Self {
223 Self {
224 shared,
225 armed: true,
226 }
227 }
228
229 fn disarm(&mut self) {
230 self.armed = false;
231 }
232}
233
234impl<T> Drop for QueuePanicGuard<T> {
235 fn drop(&mut self) {
236 if self.armed {
237 finish_delay_queue(
238 &self.shared,
239 TerminalSignal::Error(StreamError::AbruptTermination),
240 );
241 }
242 }
243}
244
245struct SlotPanicGuard<T, Extra> {
246 shared: Arc<SlotShared<T, Extra>>,
247 armed: bool,
248}
249
250impl<T, Extra> SlotPanicGuard<T, Extra> {
251 fn new(shared: Arc<SlotShared<T, Extra>>) -> Self {
252 Self {
253 shared,
254 armed: true,
255 }
256 }
257
258 fn disarm(&mut self) {
259 self.armed = false;
260 }
261}
262
263impl<T, Extra> Drop for SlotPanicGuard<T, Extra> {
264 fn drop(&mut self) {
265 if self.armed {
266 finish_slot(
267 &self.shared,
268 TerminalSignal::Error(StreamError::AbruptTermination),
269 );
270 }
271 }
272}
273
274fn cloned_materializer(materializer: &Materializer) -> Materializer {
275 materializer.with_name_prefix(Arc::from(materializer.name_prefix()))
276}
277
278fn wait_for_timer(materializer: &Materializer, delay: Duration) -> StreamResult<()> {
279 if delay.is_zero() {
280 return Ok(());
281 }
282 let gate = Arc::new((Mutex::new(false), Condvar::new()));
283 let gate_task = Arc::clone(&gate);
284 let _timer = materializer.schedule_once(delay, move || {
285 let (lock, condvar) = &*gate_task;
286 let mut done = lock.lock().unwrap_or_else(|poison| poison.into_inner());
287 *done = true;
288 drop(done);
289 condvar.notify_all();
290 });
291
292 let (lock, condvar) = &*gate;
293 let mut done = lock.lock().unwrap_or_else(|poison| poison.into_inner());
294 while !*done {
295 if materializer.is_shutdown() {
296 return Err(StreamError::AbruptTermination);
297 }
298 if current_stream_cancelled()
299 .as_ref()
300 .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
301 {
302 return Err(StreamError::Cancelled);
303 }
304 let (next, _) = condvar
305 .wait_timeout(done, WAIT_POLL_INTERVAL)
306 .unwrap_or_else(|poison| poison.into_inner());
307 done = next;
308 }
309 Ok(())
310}
311
312#[derive(Debug)]
313struct TokenBucket {
314 available: f64,
315 last: Instant,
316 capacity: f64,
317 nanos_between_tokens: f64,
318}
319
320impl TokenBucket {
321 fn new(cost: u64, per: Duration, maximum_burst: i32) -> Self {
322 assert!(cost > 0, "throttle cost must be greater than zero");
323 assert!(
324 per > Duration::ZERO,
325 "throttle period must be greater than zero"
326 );
327 assert!(
328 per.as_nanos() >= u128::from(cost),
329 "Rates larger than 1 unit / nanosecond are not supported"
330 );
331 assert!(maximum_burst >= -1, "maximum_burst must be -1 or greater");
332 let nanos_between_tokens = per.as_nanos() as f64 / cost as f64;
333 let capacity = if maximum_burst == -1 {
334 let automatic = ((100_000_000_f64 / nanos_between_tokens).max(1.0)).floor();
335 automatic.max(1.0)
336 } else {
337 maximum_burst as f64
338 };
339 Self {
340 available: capacity,
341 last: Instant::now(),
342 capacity,
343 nanos_between_tokens,
344 }
345 }
346
347 fn offer(&mut self, cost: u64) -> Duration {
348 let now = Instant::now();
349 if now > self.last {
350 let elapsed = now.duration_since(self.last).as_nanos() as f64;
351 let replenished = elapsed / self.nanos_between_tokens;
352 self.available = (self.available + replenished).min(self.capacity);
353 self.last = now;
354 }
355
356 let cost = cost as f64;
357 if self.available >= cost {
358 self.available -= cost;
359 Duration::ZERO
360 } else {
361 let needed = cost - self.available;
362 self.available = 0.0;
363 let delay_nanos = (needed * self.nanos_between_tokens).ceil() as u64;
364 self.last = now + Duration::from_nanos(delay_nanos);
365 Duration::from_nanos(delay_nanos)
366 }
367 }
368}
369
370fn schedule_notify<T>(
371 materializer: &Materializer,
372 shared: Arc<DelayQueueShared<T>>,
373 delay: Duration,
374) where
375 T: Send + 'static,
376{
377 if delay.is_zero() {
378 shared.available.notify_all();
379 return;
380 }
381 let _timer = materializer.schedule_once(delay, move || {
382 shared.available.notify_all();
383 });
384}
385
386fn delay_stage<Out, Supplier, Strategy>(
387 input: BoxStream<Out>,
388 delay_strategy_supplier: Arc<Supplier>,
389 overflow_strategy: DelayOverflowStrategy,
390 materializer: &Materializer,
391) -> StreamResult<BoxStream<Out>>
392where
393 Out: Send + 'static,
394 Supplier: Fn() -> Strategy + Send + Sync + 'static,
395 Strategy: FnMut(&Out) -> Duration + Send + 'static,
396{
397 let shared = DelayQueueShared::new();
398 let producer_shared = Arc::clone(&shared);
399 let cancelled = Arc::clone(&shared.cancelled);
400 let state = Arc::clone(&materializer.inner.state);
401 let materializer = cloned_materializer(materializer);
402 let task_materializer = materializer.clone();
403 let completion = materializer.spawn_stream(move |_| {
404 let mut panic_guard = QueuePanicGuard::new(Arc::clone(&producer_shared));
405 let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
406 let mut delay_strategy = delay_strategy_supplier();
407
408 loop {
409 if cancelled.load(Ordering::SeqCst) {
410 panic_guard.disarm();
411 return Ok(NotUsed);
412 }
413
414 match input.next() {
415 Some(Ok(item)) => {
416 let delay = match catch_unwind(AssertUnwindSafe(|| delay_strategy(&item))) {
417 Ok(delay) => delay,
418 Err(_) => {
419 panic_guard.disarm();
420 finish_delay_queue(
421 &producer_shared,
422 TerminalSignal::Error(StreamError::AbruptTermination),
423 );
424 return Ok(NotUsed);
425 }
426 };
427
428 let deadline = Instant::now() + delay;
429 let mut state = producer_shared
430 .state
431 .lock()
432 .unwrap_or_else(|poison| poison.into_inner());
433
434 match overflow_strategy {
435 DelayOverflowStrategy::Backpressure => {
436 while state.queue.len() == DELAY_BUFFER_CAPACITY
437 && !cancelled.load(Ordering::SeqCst)
438 {
439 state = producer_shared
440 .available
441 .wait(state)
442 .unwrap_or_else(|poison| poison.into_inner());
443 }
444 if cancelled.load(Ordering::SeqCst) {
445 panic_guard.disarm();
446 return Ok(NotUsed);
447 }
448 let was_empty = state.queue.is_empty();
449 state.queue.push_back((deadline, item));
450 drop(state);
451 if was_empty {
452 schedule_notify(
453 &task_materializer,
454 Arc::clone(&producer_shared),
455 delay,
456 );
457 }
458 producer_shared.available.notify_all();
459 }
460 DelayOverflowStrategy::DropHead => {
461 if state.queue.len() == DELAY_BUFFER_CAPACITY {
462 let _ = state.queue.pop_front();
463 }
464 let was_empty = state.queue.is_empty();
465 state.queue.push_back((deadline, item));
466 drop(state);
467 if was_empty {
468 schedule_notify(
469 &task_materializer,
470 Arc::clone(&producer_shared),
471 delay,
472 );
473 }
474 producer_shared.available.notify_all();
475 }
476 DelayOverflowStrategy::DropTail => {
477 if state.queue.len() == DELAY_BUFFER_CAPACITY {
478 let _ = state.queue.pop_back();
479 }
480 let was_empty = state.queue.is_empty();
481 state.queue.push_back((deadline, item));
482 drop(state);
483 if was_empty {
484 schedule_notify(
485 &task_materializer,
486 Arc::clone(&producer_shared),
487 delay,
488 );
489 }
490 producer_shared.available.notify_all();
491 }
492 DelayOverflowStrategy::DropBuffer => {
493 if state.queue.len() == DELAY_BUFFER_CAPACITY {
494 state.queue.clear();
495 }
496 let was_empty = state.queue.is_empty();
497 state.queue.push_back((deadline, item));
498 drop(state);
499 if was_empty {
500 schedule_notify(
501 &task_materializer,
502 Arc::clone(&producer_shared),
503 delay,
504 );
505 }
506 producer_shared.available.notify_all();
507 }
508 DelayOverflowStrategy::DropNew => {
509 if state.queue.len() < DELAY_BUFFER_CAPACITY {
510 let was_empty = state.queue.is_empty();
511 state.queue.push_back((deadline, item));
512 drop(state);
513 if was_empty {
514 schedule_notify(
515 &task_materializer,
516 Arc::clone(&producer_shared),
517 delay,
518 );
519 }
520 producer_shared.available.notify_all();
521 }
522 }
523 DelayOverflowStrategy::Fail => {
524 if state.queue.len() == DELAY_BUFFER_CAPACITY {
525 state.queue.clear();
526 drop(state);
527 panic_guard.disarm();
528 finish_delay_queue(
529 &producer_shared,
530 TerminalSignal::Error(StreamError::Failed(format!(
531 "Buffer overflow for delay operator (max capacity was: {DELAY_BUFFER_CAPACITY})!"
532 ))),
533 );
534 return Ok(NotUsed);
535 }
536 let was_empty = state.queue.is_empty();
537 state.queue.push_back((deadline, item));
538 drop(state);
539 if was_empty {
540 schedule_notify(
541 &task_materializer,
542 Arc::clone(&producer_shared),
543 delay,
544 );
545 }
546 producer_shared.available.notify_all();
547 }
548 DelayOverflowStrategy::EmitEarly => {
549 if state.queue.len() == DELAY_BUFFER_CAPACITY {
550 if let Some((early_deadline, _)) = state.queue.front_mut() {
551 *early_deadline = Instant::now();
552 }
553 drop(state);
554 producer_shared.available.notify_all();
555 state = producer_shared
556 .state
557 .lock()
558 .unwrap_or_else(|poison| poison.into_inner());
559 while state.queue.len() == DELAY_BUFFER_CAPACITY
560 && !cancelled.load(Ordering::SeqCst)
561 {
562 state = producer_shared
563 .available
564 .wait(state)
565 .unwrap_or_else(|poison| poison.into_inner());
566 }
567 if cancelled.load(Ordering::SeqCst) {
568 panic_guard.disarm();
569 return Ok(NotUsed);
570 }
571 }
572 let was_empty = state.queue.is_empty();
573 state.queue.push_back((deadline, item));
574 drop(state);
575 if was_empty {
576 schedule_notify(
577 &task_materializer,
578 Arc::clone(&producer_shared),
579 delay,
580 );
581 }
582 producer_shared.available.notify_all();
583 }
584 }
585 }
586 Some(Err(error)) => {
587 panic_guard.disarm();
588 finish_delay_queue(&producer_shared, TerminalSignal::Error(error));
589 return Ok(NotUsed);
590 }
591 None => {
592 panic_guard.disarm();
593 finish_delay_queue(&producer_shared, TerminalSignal::Complete);
594 return Ok(NotUsed);
595 }
596 }
597 }
598 });
599
600 Ok(Box::new(DelayQueueStream {
601 shared,
602 completion: Some(completion),
603 }))
604}
605
606struct ThrottleStream<Out, CostFn> {
607 input: BoxStream<Out>,
608 materializer: Materializer,
609 token_bucket: TokenBucket,
610 cost_fn: Arc<CostFn>,
611 mode: ThrottleMode,
612 terminal: Option<TerminalSignal>,
613}
614
615impl<Out, CostFn> Iterator for ThrottleStream<Out, CostFn>
616where
617 Out: Send + 'static,
618 CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
619{
620 type Item = StreamResult<Out>;
621
622 fn next(&mut self) -> Option<Self::Item> {
623 if let Some(terminal) = self.terminal.clone() {
624 return match terminal {
625 TerminalSignal::Complete => None,
626 TerminalSignal::Error(error) => Some(Err(error)),
627 };
628 }
629
630 match self.input.next() {
631 Some(Ok(item)) => {
632 let cost = match catch_unwind(AssertUnwindSafe(|| (self.cost_fn)(&item))) {
633 Ok(cost) => cost,
634 Err(_) => {
635 self.terminal = Some(TerminalSignal::Error(StreamError::AbruptTermination));
636 return Some(Err(StreamError::AbruptTermination));
637 }
638 };
639 let delay = self.token_bucket.offer(cost);
640 if delay.is_zero() {
641 Some(Ok(item))
642 } else if self.mode == ThrottleMode::Enforcing {
643 let error =
644 StreamError::Failed("Maximum throttle throughput exceeded.".to_owned());
645 self.terminal = Some(TerminalSignal::Error(error.clone()));
646 Some(Err(error))
647 } else {
648 match wait_for_timer(&self.materializer, delay) {
649 Ok(()) => Some(Ok(item)),
650 Err(error) => {
651 self.terminal = Some(TerminalSignal::Error(error.clone()));
652 Some(Err(error))
653 }
654 }
655 }
656 }
657 Some(Err(error)) => {
658 self.terminal = Some(TerminalSignal::Error(error.clone()));
659 Some(Err(error))
660 }
661 None => {
662 self.terminal = Some(TerminalSignal::Complete);
663 None
664 }
665 }
666 }
667}
668
669struct GroupedShared<T> {
670 state: Mutex<GroupedState<T>>,
671 available: Condvar,
672 cancelled: Arc<AtomicBool>,
673}
674
675struct GroupedState<T> {
676 ready: VecDeque<Vec<T>>,
677 current: Vec<T>,
678 current_weight: u64,
679 generation: u64,
680 terminal: Option<TerminalSignal>,
681}
682
683impl<T> GroupedShared<T> {
684 fn new() -> Arc<Self> {
685 Arc::new(Self {
686 state: Mutex::new(GroupedState {
687 ready: VecDeque::new(),
688 current: Vec::new(),
689 current_weight: 0,
690 generation: 0,
691 terminal: None,
692 }),
693 available: Condvar::new(),
694 cancelled: Arc::new(AtomicBool::new(false)),
695 })
696 }
697}
698
699struct GroupedStream<T> {
700 shared: Arc<GroupedShared<T>>,
701 completion: Option<StreamCompletion<NotUsed>>,
702}
703
704impl<T> Iterator for GroupedStream<T> {
705 type Item = StreamResult<Vec<T>>;
706
707 fn next(&mut self) -> Option<Self::Item> {
708 let mut state = self
709 .shared
710 .state
711 .lock()
712 .unwrap_or_else(|poison| poison.into_inner());
713 loop {
714 if let Some(group) = state.ready.pop_front() {
715 drop(state);
716 self.shared.available.notify_all();
717 return Some(Ok(group));
718 }
719 if let Some(terminal) = state.terminal.clone() {
720 return match terminal {
721 TerminalSignal::Complete => None,
722 TerminalSignal::Error(error) => Some(Err(error)),
723 };
724 }
725 if self.shared.cancelled.load(Ordering::SeqCst) {
726 return Some(Err(StreamError::Cancelled));
727 }
728 let (next, _) = self
729 .shared
730 .available
731 .wait_timeout(state, WAIT_POLL_INTERVAL)
732 .unwrap_or_else(|poison| poison.into_inner());
733 state = next;
734 }
735 }
736}
737
738impl<T> Drop for GroupedStream<T> {
739 fn drop(&mut self) {
740 self.shared.cancelled.store(true, Ordering::SeqCst);
741 self.shared.available.notify_all();
742 let _ = self.completion.take();
743 }
744}
745
746fn finish_grouped<T>(shared: &GroupedShared<T>, terminal: TerminalSignal) {
747 let mut state = shared
748 .state
749 .lock()
750 .unwrap_or_else(|poison| poison.into_inner());
751 if state.terminal.is_none() {
752 if !state.current.is_empty() {
753 let current = mem::take(&mut state.current);
754 state.ready.push_back(current);
755 state.current_weight = 0;
756 state.generation = state.generation.wrapping_add(1);
757 }
758 state.terminal = Some(terminal);
759 }
760 drop(state);
761 shared.available.notify_all();
762}
763
764fn arm_grouped_timer<T: Send + 'static>(
765 materializer: &Materializer,
766 shared: Arc<GroupedShared<T>>,
767 interval: Duration,
768 generation: u64,
769) {
770 let _timer = materializer.schedule_once(interval, move || {
771 let mut state = shared
772 .state
773 .lock()
774 .unwrap_or_else(|poison| poison.into_inner());
775 if state.terminal.is_some() || state.generation != generation || state.current.is_empty() {
776 return;
777 }
778 let current = mem::take(&mut state.current);
779 state.ready.push_back(current);
780 state.current_weight = 0;
781 state.generation = state.generation.wrapping_add(1);
782 drop(state);
783 shared.available.notify_all();
784 });
785}
786
787fn grouped_weighted_within_stage<Out, CostFn>(
788 input: BoxStream<Out>,
789 max_weight: u64,
790 max_number: usize,
791 interval: Duration,
792 cost_fn: Arc<CostFn>,
793 materializer: &Materializer,
794) -> StreamResult<BoxStream<Vec<Out>>>
795where
796 Out: Send + 'static,
797 CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
798{
799 assert!(
800 max_weight > 0,
801 "grouped_weighted_within max_weight must be greater than zero"
802 );
803 assert!(
804 max_number > 0,
805 "grouped_weighted_within max_number must be greater than zero"
806 );
807 assert!(
808 interval > Duration::ZERO,
809 "grouped_weighted_within interval must be greater than zero"
810 );
811
812 let shared = GroupedShared::new();
813 let producer_shared = Arc::clone(&shared);
814 let cancelled = Arc::clone(&shared.cancelled);
815 let state = Arc::clone(&materializer.inner.state);
816 let materializer = cloned_materializer(materializer);
817 let task_materializer = materializer.clone();
818 let completion = materializer.spawn_stream(move |_| {
819 let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
820 loop {
821 if cancelled.load(Ordering::SeqCst) {
822 return Ok(NotUsed);
823 }
824
825 match input.next() {
826 Some(Ok(item)) => {
827 let weight = match catch_unwind(AssertUnwindSafe(|| (cost_fn)(&item))) {
828 Ok(weight) => weight,
829 Err(_) => {
830 finish_grouped(
831 &producer_shared,
832 TerminalSignal::Error(StreamError::AbruptTermination),
833 );
834 return Ok(NotUsed);
835 }
836 };
837
838 let mut state = producer_shared
839 .state
840 .lock()
841 .unwrap_or_else(|poison| poison.into_inner());
842 if state.current.is_empty() {
843 state.current.push(item);
844 state.current_weight = weight;
845 state.generation = state.generation.wrapping_add(1);
846 let generation = state.generation;
847 drop(state);
848 arm_grouped_timer(
849 &task_materializer,
850 Arc::clone(&producer_shared),
851 interval,
852 generation,
853 );
854 producer_shared.available.notify_all();
855 continue;
856 }
857
858 let fits = state.current_weight.saturating_add(weight) <= max_weight
859 && state.current.len() < max_number;
860 if fits {
861 state.current.push(item);
862 state.current_weight = state.current_weight.saturating_add(weight);
863 if state.current_weight >= max_weight || state.current.len() >= max_number {
864 let current = mem::take(&mut state.current);
865 state.ready.push_back(current);
866 state.current_weight = 0;
867 state.generation = state.generation.wrapping_add(1);
868 }
869 drop(state);
870 producer_shared.available.notify_all();
871 continue;
872 }
873
874 let current = mem::take(&mut state.current);
875 state.ready.push_back(current);
876 state.current_weight = 0;
877 state.generation = state.generation.wrapping_add(1);
878 let heavy_alone = weight > max_weight;
879 if heavy_alone {
880 state.ready.push_back(vec![item]);
881 } else {
882 state.current.push(item);
883 state.current_weight = weight;
884 state.generation = state.generation.wrapping_add(1);
885 let generation = state.generation;
886 drop(state);
887 arm_grouped_timer(
888 &task_materializer,
889 Arc::clone(&producer_shared),
890 interval,
891 generation,
892 );
893 producer_shared.available.notify_all();
894 continue;
895 }
896 drop(state);
897 producer_shared.available.notify_all();
898 }
899 Some(Err(error)) => {
900 finish_grouped(&producer_shared, TerminalSignal::Error(error));
901 return Ok(NotUsed);
902 }
903 None => {
904 finish_grouped(&producer_shared, TerminalSignal::Complete);
905 return Ok(NotUsed);
906 }
907 }
908 }
909 });
910
911 Ok(Box::new(GroupedStream {
912 shared,
913 completion: Some(completion),
914 }))
915}
916
917struct ForwardExtra {
918 generation: u64,
919}
920
921fn forward_slot_stage<Out, Setup, OnItem>(
922 input: BoxStream<Out>,
923 materializer: &Materializer,
924 setup: Setup,
925 on_item: OnItem,
926) -> StreamResult<BoxStream<Out>>
927where
928 Out: Send + 'static,
929 Setup:
930 FnOnce(Arc<SlotShared<Out, ForwardExtra>>, Arc<AtomicBool>, &Materializer) + Send + 'static,
931 OnItem: Fn(&Arc<SlotShared<Out, ForwardExtra>>, &Materializer, &Out) -> StreamResult<()>
932 + Send
933 + Sync
934 + 'static,
935{
936 let shared = SlotShared::new(ForwardExtra { generation: 0 });
937 let producer_shared = Arc::clone(&shared);
938 let cancelled = Arc::clone(&shared.cancelled);
939 let state = Arc::clone(&materializer.inner.state);
940 let materializer = cloned_materializer(materializer);
941 let task_materializer = materializer.clone();
942 setup(
943 Arc::clone(&producer_shared),
944 Arc::clone(&cancelled),
945 &materializer,
946 );
947 let on_item = Arc::new(on_item);
948 let completion = materializer.spawn_stream(move |_| {
949 let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
950 let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
951 loop {
952 if cancelled.load(Ordering::SeqCst) {
953 panic_guard.disarm();
954 return Ok(NotUsed);
955 }
956 match input.next() {
957 Some(Ok(item)) => {
958 on_item(&producer_shared, &task_materializer, &item)?;
959 let mut state = producer_shared
960 .state
961 .lock()
962 .unwrap_or_else(|poison| poison.into_inner());
963 while state.slot.is_some()
964 && state.terminal.is_none()
965 && !cancelled.load(Ordering::SeqCst)
966 {
967 state = producer_shared
968 .available
969 .wait(state)
970 .unwrap_or_else(|poison| poison.into_inner());
971 }
972 if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
973 panic_guard.disarm();
974 return Ok(NotUsed);
975 }
976 state.slot = Some(item);
977 state.extra.generation = state.extra.generation.wrapping_add(1);
978 drop(state);
979 producer_shared.available.notify_all();
980 }
981 Some(Err(error)) => {
982 panic_guard.disarm();
983 finish_slot(&producer_shared, TerminalSignal::Error(error));
984 return Ok(NotUsed);
985 }
986 None => {
987 panic_guard.disarm();
988 finish_slot(&producer_shared, TerminalSignal::Complete);
989 return Ok(NotUsed);
990 }
991 }
992 }
993 });
994
995 Ok(Box::new(SlotStream {
996 shared,
997 completion: Some(completion),
998 }))
999}
1000
1001fn take_within_stage<Out: Send + 'static>(
1002 input: BoxStream<Out>,
1003 timeout: Duration,
1004 materializer: &Materializer,
1005) -> StreamResult<BoxStream<Out>> {
1006 assert!(
1007 timeout > Duration::ZERO,
1008 "take_within timeout must be greater than zero"
1009 );
1010 forward_slot_stage(
1011 input,
1012 materializer,
1013 move |shared, cancelled, materializer| {
1014 let _timer = materializer.schedule_once(timeout, move || {
1015 finish_slot(&shared, TerminalSignal::Complete);
1016 cancelled.store(true, Ordering::SeqCst);
1017 });
1018 },
1019 |_, _, _| Ok(()),
1020 )
1021}
1022
1023fn initial_delay_stage<Out: Send + 'static>(
1024 input: BoxStream<Out>,
1025 delay: Duration,
1026 materializer: &Materializer,
1027) -> StreamResult<BoxStream<Out>> {
1028 assert!(delay >= Duration::ZERO);
1029 let materializer = cloned_materializer(materializer);
1030 Ok(Box::new(InitialDelayStream {
1031 input,
1032 materializer,
1033 opened: delay.is_zero(),
1034 delay,
1035 terminal: None,
1036 }))
1037}
1038
1039struct InitialDelayStream<Out> {
1040 input: BoxStream<Out>,
1041 materializer: Materializer,
1042 opened: bool,
1043 delay: Duration,
1044 terminal: Option<TerminalSignal>,
1045}
1046
1047impl<Out: Send + 'static> Iterator for InitialDelayStream<Out> {
1048 type Item = StreamResult<Out>;
1049
1050 fn next(&mut self) -> Option<Self::Item> {
1051 if let Some(terminal) = self.terminal.clone() {
1052 return match terminal {
1053 TerminalSignal::Complete => None,
1054 TerminalSignal::Error(error) => Some(Err(error)),
1055 };
1056 }
1057 if !self.opened {
1058 if let Err(error) = wait_for_timer(&self.materializer, self.delay) {
1059 self.terminal = Some(TerminalSignal::Error(error.clone()));
1060 return Some(Err(error));
1061 }
1062 self.opened = true;
1063 }
1064 match self.input.next() {
1065 Some(Ok(item)) => Some(Ok(item)),
1066 Some(Err(error)) => {
1067 self.terminal = Some(TerminalSignal::Error(error.clone()));
1068 Some(Err(error))
1069 }
1070 None => {
1071 self.terminal = Some(TerminalSignal::Complete);
1072 None
1073 }
1074 }
1075 }
1076}
1077
1078fn arm_generation_failure<Out: Send + 'static>(
1079 materializer: &Materializer,
1080 shared: Arc<SlotShared<Out, ForwardExtra>>,
1081 timeout: Duration,
1082 message: &'static str,
1083 generation: u64,
1084 require_empty_slot: bool,
1085) {
1086 let _timer = materializer.schedule_once(timeout, move || {
1087 let should_fail = {
1088 let state = shared
1089 .state
1090 .lock()
1091 .unwrap_or_else(|poison| poison.into_inner());
1092 state.terminal.is_none()
1093 && state.extra.generation == generation
1094 && (!require_empty_slot || state.slot.is_none())
1095 };
1096 if should_fail {
1097 finish_slot(
1098 &shared,
1099 TerminalSignal::Error(StreamError::Failed(message.to_owned())),
1100 );
1101 shared.cancelled.store(true, Ordering::SeqCst);
1102 }
1103 });
1104}
1105
1106fn initial_timeout_stage<Out: Send + 'static>(
1107 input: BoxStream<Out>,
1108 timeout: Duration,
1109 materializer: &Materializer,
1110) -> StreamResult<BoxStream<Out>> {
1111 assert!(
1112 timeout > Duration::ZERO,
1113 "initial_timeout timeout must be greater than zero"
1114 );
1115 let shared = SlotShared::new(ForwardExtra { generation: 0 });
1116 let producer_shared = Arc::clone(&shared);
1117 let cancelled = Arc::clone(&shared.cancelled);
1118 let state = Arc::clone(&materializer.inner.state);
1119 let materializer = cloned_materializer(materializer);
1120 arm_generation_failure(
1121 &materializer,
1122 Arc::clone(&producer_shared),
1123 timeout,
1124 "The first element has not yet passed through before the initial timeout elapsed.",
1125 0,
1126 true,
1127 );
1128 let completion = materializer.spawn_stream(move |_| {
1129 let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
1130 let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
1131 loop {
1132 if cancelled.load(Ordering::SeqCst) {
1133 panic_guard.disarm();
1134 return Ok(NotUsed);
1135 }
1136 match input.next() {
1137 Some(Ok(item)) => {
1138 let mut state = producer_shared
1139 .state
1140 .lock()
1141 .unwrap_or_else(|poison| poison.into_inner());
1142 state.extra.generation = state.extra.generation.wrapping_add(1);
1143 while state.slot.is_some()
1144 && state.terminal.is_none()
1145 && !cancelled.load(Ordering::SeqCst)
1146 {
1147 state = producer_shared
1148 .available
1149 .wait(state)
1150 .unwrap_or_else(|poison| poison.into_inner());
1151 }
1152 if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
1153 panic_guard.disarm();
1154 return Ok(NotUsed);
1155 }
1156 state.slot = Some(item);
1157 drop(state);
1158 producer_shared.available.notify_all();
1159 }
1160 Some(Err(error)) => {
1161 panic_guard.disarm();
1162 finish_slot(&producer_shared, TerminalSignal::Error(error));
1163 return Ok(NotUsed);
1164 }
1165 None => {
1166 panic_guard.disarm();
1167 finish_slot(&producer_shared, TerminalSignal::Complete);
1168 return Ok(NotUsed);
1169 }
1170 }
1171 }
1172 });
1173
1174 Ok(Box::new(SlotStream {
1175 shared,
1176 completion: Some(completion),
1177 }))
1178}
1179
1180fn completion_timeout_stage<Out: Send + 'static>(
1181 input: BoxStream<Out>,
1182 timeout: Duration,
1183 materializer: &Materializer,
1184) -> StreamResult<BoxStream<Out>> {
1185 assert!(
1186 timeout > Duration::ZERO,
1187 "completion_timeout timeout must be greater than zero"
1188 );
1189 forward_slot_stage(
1190 input,
1191 materializer,
1192 move |shared, cancelled, materializer| {
1193 let _timer = materializer.schedule_once(timeout, move || {
1194 finish_slot(
1195 &shared,
1196 TerminalSignal::Error(StreamError::Failed(
1197 "The stream has not been completed before the completion timeout elapsed."
1198 .to_owned(),
1199 )),
1200 );
1201 cancelled.store(true, Ordering::SeqCst);
1202 });
1203 },
1204 |_, _, _| Ok(()),
1205 )
1206}
1207
1208fn idle_timeout_stage<Out: Send + 'static>(
1209 input: BoxStream<Out>,
1210 timeout: Duration,
1211 materializer: &Materializer,
1212) -> StreamResult<BoxStream<Out>> {
1213 assert!(
1214 timeout > Duration::ZERO,
1215 "idle_timeout timeout must be greater than zero"
1216 );
1217 let shared = SlotShared::new(ForwardExtra { generation: 0 });
1218 let producer_shared = Arc::clone(&shared);
1219 let cancelled = Arc::clone(&shared.cancelled);
1220 let state = Arc::clone(&materializer.inner.state);
1221 let materializer = cloned_materializer(materializer);
1222 let task_materializer = materializer.clone();
1223 arm_generation_failure(
1224 &materializer,
1225 Arc::clone(&producer_shared),
1226 timeout,
1227 "No elements passed before the idle timeout elapsed.",
1228 0,
1229 false,
1230 );
1231 let completion = materializer.spawn_stream(move |_| {
1232 let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
1233 let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
1234 loop {
1235 if cancelled.load(Ordering::SeqCst) {
1236 panic_guard.disarm();
1237 return Ok(NotUsed);
1238 }
1239 match input.next() {
1240 Some(Ok(item)) => {
1241 let mut state = producer_shared
1242 .state
1243 .lock()
1244 .unwrap_or_else(|poison| poison.into_inner());
1245 while state.slot.is_some()
1246 && state.terminal.is_none()
1247 && !cancelled.load(Ordering::SeqCst)
1248 {
1249 state = producer_shared
1250 .available
1251 .wait(state)
1252 .unwrap_or_else(|poison| poison.into_inner());
1253 }
1254 if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
1255 panic_guard.disarm();
1256 return Ok(NotUsed);
1257 }
1258 state.slot = Some(item);
1259 state.extra.generation = state.extra.generation.wrapping_add(1);
1260 let generation = state.extra.generation;
1261 drop(state);
1262 arm_generation_failure(
1263 &task_materializer,
1264 Arc::clone(&producer_shared),
1265 timeout,
1266 "No elements passed before the idle timeout elapsed.",
1267 generation,
1268 false,
1269 );
1270 producer_shared.available.notify_all();
1271 }
1272 Some(Err(error)) => {
1273 panic_guard.disarm();
1274 finish_slot(&producer_shared, TerminalSignal::Error(error));
1275 return Ok(NotUsed);
1276 }
1277 None => {
1278 panic_guard.disarm();
1279 finish_slot(&producer_shared, TerminalSignal::Complete);
1280 return Ok(NotUsed);
1281 }
1282 }
1283 }
1284 });
1285
1286 Ok(Box::new(SlotStream {
1287 shared,
1288 completion: Some(completion),
1289 }))
1290}
1291
1292fn backpressure_timeout_stage<Out: Send + 'static>(
1293 input: BoxStream<Out>,
1294 timeout: Duration,
1295 materializer: &Materializer,
1296) -> StreamResult<BoxStream<Out>> {
1297 assert!(
1298 timeout > Duration::ZERO,
1299 "backpressure_timeout timeout must be greater than zero"
1300 );
1301 let shared = SlotShared::new(ForwardExtra { generation: 0 });
1302 let producer_shared = Arc::clone(&shared);
1303 let cancelled = Arc::clone(&shared.cancelled);
1304 let state = Arc::clone(&materializer.inner.state);
1305 let materializer = cloned_materializer(materializer);
1306 let task_materializer = materializer.clone();
1307 let completion = materializer.spawn_stream(move |_| {
1308 let schedule_timeout = |generation: u64, shared: Arc<SlotShared<Out, ForwardExtra>>| {
1309 let _timer = task_materializer.schedule_once(timeout, move || {
1310 let mut state = shared
1311 .state
1312 .lock()
1313 .unwrap_or_else(|poison| poison.into_inner());
1314 if state.terminal.is_some() || state.extra.generation != generation {
1315 return;
1316 }
1317 state.slot = None;
1318 state.terminal = Some(TerminalSignal::Error(StreamError::Failed(
1319 "No downstream demand signalled before the backpressure timeout elapsed."
1320 .to_owned(),
1321 )));
1322 drop(state);
1323 shared.cancelled.store(true, Ordering::SeqCst);
1324 shared.available.notify_all();
1325 });
1326 };
1327 let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
1328 let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
1329 loop {
1330 if cancelled.load(Ordering::SeqCst) {
1331 panic_guard.disarm();
1332 return Ok(NotUsed);
1333 }
1334 match input.next() {
1335 Some(Ok(item)) => {
1336 let mut state = producer_shared
1337 .state
1338 .lock()
1339 .unwrap_or_else(|poison| poison.into_inner());
1340 while state.slot.is_some()
1341 && state.terminal.is_none()
1342 && !cancelled.load(Ordering::SeqCst)
1343 {
1344 state = producer_shared
1345 .available
1346 .wait(state)
1347 .unwrap_or_else(|poison| poison.into_inner());
1348 }
1349 if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
1350 panic_guard.disarm();
1351 return Ok(NotUsed);
1352 }
1353 state.slot = Some(item);
1354 state.extra.generation = state.extra.generation.wrapping_add(1);
1355 let generation = state.extra.generation;
1356 drop(state);
1357 schedule_timeout(generation, Arc::clone(&producer_shared));
1358 producer_shared.available.notify_all();
1359 }
1360 Some(Err(error)) => {
1361 panic_guard.disarm();
1362 finish_slot(&producer_shared, TerminalSignal::Error(error));
1363 return Ok(NotUsed);
1364 }
1365 None => {
1366 panic_guard.disarm();
1367 finish_slot(&producer_shared, TerminalSignal::Complete);
1368 return Ok(NotUsed);
1369 }
1370 }
1371 }
1372 });
1373
1374 struct BackpressureStream<Out> {
1375 shared: Arc<SlotShared<Out, ForwardExtra>>,
1376 completion: Option<StreamCompletion<NotUsed>>,
1377 }
1378
1379 impl<Out> Iterator for BackpressureStream<Out> {
1380 type Item = StreamResult<Out>;
1381
1382 fn next(&mut self) -> Option<Self::Item> {
1383 let mut state = self
1384 .shared
1385 .state
1386 .lock()
1387 .unwrap_or_else(|poison| poison.into_inner());
1388 loop {
1389 if let Some(item) = state.slot.take() {
1390 state.extra.generation = state.extra.generation.wrapping_add(1);
1391 drop(state);
1392 self.shared.available.notify_all();
1393 return Some(Ok(item));
1394 }
1395 if let Some(terminal) = state.terminal.clone() {
1396 return match terminal {
1397 TerminalSignal::Complete => None,
1398 TerminalSignal::Error(error) => Some(Err(error)),
1399 };
1400 }
1401 if self.shared.cancelled.load(Ordering::SeqCst) {
1402 return Some(Err(StreamError::Cancelled));
1403 }
1404 let (next, _) = self
1405 .shared
1406 .available
1407 .wait_timeout(state, WAIT_POLL_INTERVAL)
1408 .unwrap_or_else(|poison| poison.into_inner());
1409 state = next;
1410 }
1411 }
1412 }
1413
1414 impl<Out> Drop for BackpressureStream<Out> {
1415 fn drop(&mut self) {
1416 self.shared.cancelled.store(true, Ordering::SeqCst);
1417 self.shared.available.notify_all();
1418 let _ = self.completion.take();
1419 }
1420 }
1421
1422 Ok(Box::new(BackpressureStream {
1423 shared,
1424 completion: Some(completion),
1425 }))
1426}
1427
1428struct KeepAliveExtra {
1429 generation: u64,
1430}
1431
1432fn arm_keep_alive_timer<Out, Inject>(
1433 materializer: &Materializer,
1434 shared: Arc<SlotShared<Out, KeepAliveExtra>>,
1435 timeout: Duration,
1436 generation: u64,
1437 inject: Arc<Inject>,
1438) where
1439 Out: Send + 'static,
1440 Inject: Fn() -> Out + Send + Sync + 'static,
1441{
1442 let materializer = cloned_materializer(materializer);
1443 let task_materializer = materializer.clone();
1444 let _timer = materializer.schedule_once(timeout, move || {
1445 let slot_occupied = {
1446 let state = shared
1447 .state
1448 .lock()
1449 .unwrap_or_else(|poison| poison.into_inner());
1450 if state.terminal.is_some() || state.extra.generation != generation {
1451 return;
1452 }
1453 state.slot.is_some()
1454 };
1455 if slot_occupied {
1456 arm_keep_alive_timer(
1457 &task_materializer,
1458 Arc::clone(&shared),
1459 timeout,
1460 generation,
1461 Arc::clone(&inject),
1462 );
1463 return;
1464 }
1465
1466 let injected = match catch_unwind(AssertUnwindSafe(|| inject())) {
1467 Ok(item) => item,
1468 Err(_) => {
1469 finish_slot(
1470 &shared,
1471 TerminalSignal::Error(StreamError::AbruptTermination),
1472 );
1473 shared.cancelled.store(true, Ordering::SeqCst);
1474 return;
1475 }
1476 };
1477
1478 let mut state = shared
1479 .state
1480 .lock()
1481 .unwrap_or_else(|poison| poison.into_inner());
1482 if state.terminal.is_some() || state.extra.generation != generation || state.slot.is_some()
1483 {
1484 return;
1485 }
1486 state.slot = Some(injected);
1487 state.extra.generation = state.extra.generation.wrapping_add(1);
1488 let next_generation = state.extra.generation;
1489 drop(state);
1490 shared.available.notify_all();
1491 arm_keep_alive_timer(
1492 &task_materializer,
1493 Arc::clone(&shared),
1494 timeout,
1495 next_generation,
1496 Arc::clone(&inject),
1497 );
1498 });
1499}
1500
1501fn keep_alive_stage<Out, Inject>(
1502 input: BoxStream<Out>,
1503 timeout: Duration,
1504 inject: Arc<Inject>,
1505 materializer: &Materializer,
1506) -> StreamResult<BoxStream<Out>>
1507where
1508 Out: Send + 'static,
1509 Inject: Fn() -> Out + Send + Sync + 'static,
1510{
1511 assert!(
1512 timeout > Duration::ZERO,
1513 "keep_alive timeout must be greater than zero"
1514 );
1515 let shared = SlotShared::new(KeepAliveExtra { generation: 0 });
1516 let producer_shared = Arc::clone(&shared);
1517 let cancelled = Arc::clone(&shared.cancelled);
1518 let state = Arc::clone(&materializer.inner.state);
1519 let materializer = cloned_materializer(materializer);
1520 let task_materializer = materializer.clone();
1521 arm_keep_alive_timer(
1522 &materializer,
1523 Arc::clone(&producer_shared),
1524 timeout,
1525 0,
1526 Arc::clone(&inject),
1527 );
1528 let completion = materializer.spawn_stream(move |_| {
1529 let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
1530 let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
1531 loop {
1532 if cancelled.load(Ordering::SeqCst) {
1533 panic_guard.disarm();
1534 return Ok(NotUsed);
1535 }
1536 match input.next() {
1537 Some(Ok(item)) => {
1538 let mut state = producer_shared
1539 .state
1540 .lock()
1541 .unwrap_or_else(|poison| poison.into_inner());
1542 while state.slot.is_some()
1543 && state.terminal.is_none()
1544 && !cancelled.load(Ordering::SeqCst)
1545 {
1546 state = producer_shared
1547 .available
1548 .wait(state)
1549 .unwrap_or_else(|poison| poison.into_inner());
1550 }
1551 if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
1552 panic_guard.disarm();
1553 return Ok(NotUsed);
1554 }
1555 state.slot = Some(item);
1556 state.extra.generation = state.extra.generation.wrapping_add(1);
1557 let generation = state.extra.generation;
1558 drop(state);
1559 producer_shared.available.notify_all();
1560 arm_keep_alive_timer(
1561 &task_materializer,
1562 Arc::clone(&producer_shared),
1563 timeout,
1564 generation,
1565 Arc::clone(&inject),
1566 );
1567 }
1568 Some(Err(error)) => {
1569 panic_guard.disarm();
1570 finish_slot(&producer_shared, TerminalSignal::Error(error));
1571 return Ok(NotUsed);
1572 }
1573 None => {
1574 panic_guard.disarm();
1575 finish_slot(&producer_shared, TerminalSignal::Complete);
1576 return Ok(NotUsed);
1577 }
1578 }
1579 }
1580 });
1581
1582 Ok(Box::new(SlotStream {
1583 shared,
1584 completion: Some(completion),
1585 }))
1586}
1587
1588fn drop_within_stage<Out: Send + 'static>(
1589 input: BoxStream<Out>,
1590 timeout: Duration,
1591 materializer: &Materializer,
1592) -> StreamResult<BoxStream<Out>> {
1593 assert!(
1594 timeout > Duration::ZERO,
1595 "drop_within timeout must be greater than zero"
1596 );
1597 struct DropWithinExtra;
1598 let shared = SlotShared::new(DropWithinExtra);
1599 let producer_shared = Arc::clone(&shared);
1600 let cancelled = Arc::clone(&shared.cancelled);
1601 let state = Arc::clone(&materializer.inner.state);
1602 let materializer = cloned_materializer(materializer);
1603 let deadline = Instant::now() + timeout;
1604 let completion = materializer.spawn_stream(move |_| {
1605 let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
1606 let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
1607 loop {
1608 if cancelled.load(Ordering::SeqCst) {
1609 panic_guard.disarm();
1610 return Ok(NotUsed);
1611 }
1612 match input.next() {
1613 Some(Ok(item)) => {
1614 if Instant::now() < deadline {
1615 continue;
1616 }
1617 let mut state = producer_shared
1618 .state
1619 .lock()
1620 .unwrap_or_else(|poison| poison.into_inner());
1621 while state.slot.is_some()
1622 && state.terminal.is_none()
1623 && !cancelled.load(Ordering::SeqCst)
1624 {
1625 state = producer_shared
1626 .available
1627 .wait(state)
1628 .unwrap_or_else(|poison| poison.into_inner());
1629 }
1630 if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
1631 panic_guard.disarm();
1632 return Ok(NotUsed);
1633 }
1634 state.slot = Some(item);
1635 drop(state);
1636 producer_shared.available.notify_all();
1637 }
1638 Some(Err(error)) => {
1639 panic_guard.disarm();
1640 finish_slot(&producer_shared, TerminalSignal::Error(error));
1641 return Ok(NotUsed);
1642 }
1643 None => {
1644 panic_guard.disarm();
1645 finish_slot(&producer_shared, TerminalSignal::Complete);
1646 return Ok(NotUsed);
1647 }
1648 }
1649 }
1650 });
1651
1652 Ok(Box::new(SlotStream {
1653 shared,
1654 completion: Some(completion),
1655 }))
1656}
1657
1658impl<In: Send + 'static, Out: Send + 'static, Mat: Send + 'static> Flow<In, Out, Mat> {
1659 pub fn throttle(
1660 self,
1661 elements: u64,
1662 per: Duration,
1663 maximum_burst: i32,
1664 mode: ThrottleMode,
1665 ) -> Flow<In, Out, Mat> {
1666 self.throttle_with_cost(elements, per, maximum_burst, |_| 1, mode)
1667 }
1668
1669 pub fn throttle_with_cost<CostFn>(
1670 self,
1671 cost: u64,
1672 per: Duration,
1673 maximum_burst: i32,
1674 cost_fn: CostFn,
1675 mode: ThrottleMode,
1676 ) -> Flow<In, Out, Mat>
1677 where
1678 CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
1679 {
1680 let cost_fn = Arc::new(cost_fn);
1681 self.via(Flow::from_runtime_transform(move |input, materializer| {
1682 let materializer = cloned_materializer(materializer);
1683 Ok(Box::new(ThrottleStream {
1684 input,
1685 materializer,
1686 token_bucket: TokenBucket::new(cost, per, maximum_burst),
1687 cost_fn: Arc::clone(&cost_fn),
1688 mode,
1689 terminal: None,
1690 }))
1691 }))
1692 }
1693
1694 pub fn delay(self, delay: Duration, strategy: DelayOverflowStrategy) -> Flow<In, Out, Mat> {
1695 self.delay_with(move || move |_: &Out| delay, strategy)
1696 }
1697
1698 pub fn delay_with<Supplier, Strategy>(
1699 self,
1700 delay_strategy_supplier: Supplier,
1701 overflow_strategy: DelayOverflowStrategy,
1702 ) -> Flow<In, Out, Mat>
1703 where
1704 Supplier: Fn() -> Strategy + Send + Sync + 'static,
1705 Strategy: FnMut(&Out) -> Duration + Send + 'static,
1706 {
1707 let delay_strategy_supplier = Arc::new(delay_strategy_supplier);
1708 self.via(Flow::from_runtime_transform(move |input, materializer| {
1709 delay_stage(
1710 input,
1711 Arc::clone(&delay_strategy_supplier),
1712 overflow_strategy,
1713 materializer,
1714 )
1715 }))
1716 }
1717
1718 pub fn initial_delay(self, delay: Duration) -> Flow<In, Out, Mat> {
1719 self.via(Flow::from_runtime_transform(move |input, materializer| {
1720 initial_delay_stage(input, delay, materializer)
1721 }))
1722 }
1723
1724 pub fn grouped_within(self, max_number: usize, interval: Duration) -> Flow<In, Vec<Out>, Mat> {
1725 let unit_cost = Arc::new(|_: &Out| 1_u64);
1726 self.via(Flow::from_runtime_transform(move |input, materializer| {
1727 grouped_weighted_within_stage(
1728 input,
1729 max_number as u64,
1730 max_number,
1731 interval,
1732 Arc::clone(&unit_cost),
1733 materializer,
1734 )
1735 }))
1736 }
1737
1738 pub fn grouped_weighted_within<CostFn>(
1739 self,
1740 max_weight: u64,
1741 interval: Duration,
1742 cost_fn: CostFn,
1743 ) -> Flow<In, Vec<Out>, Mat>
1744 where
1745 CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
1746 {
1747 let cost_fn = Arc::new(cost_fn);
1748 self.via(Flow::from_runtime_transform(move |input, materializer| {
1749 grouped_weighted_within_stage(
1750 input,
1751 max_weight,
1752 usize::MAX,
1753 interval,
1754 Arc::clone(&cost_fn),
1755 materializer,
1756 )
1757 }))
1758 }
1759
1760 pub fn drop_within(self, timeout: Duration) -> Flow<In, Out, Mat> {
1761 self.via(Flow::from_runtime_transform(move |input, materializer| {
1762 drop_within_stage(input, timeout, materializer)
1763 }))
1764 }
1765
1766 pub fn take_within(self, timeout: Duration) -> Flow<In, Out, Mat> {
1767 self.via(Flow::from_runtime_transform(move |input, materializer| {
1768 take_within_stage(input, timeout, materializer)
1769 }))
1770 }
1771
1772 pub fn idle_timeout(self, timeout: Duration) -> Flow<In, Out, Mat> {
1773 self.via(Flow::from_runtime_transform(move |input, materializer| {
1774 idle_timeout_stage(input, timeout, materializer)
1775 }))
1776 }
1777
1778 pub fn backpressure_timeout(self, timeout: Duration) -> Flow<In, Out, Mat> {
1779 self.via(Flow::from_runtime_transform(move |input, materializer| {
1780 backpressure_timeout_stage(input, timeout, materializer)
1781 }))
1782 }
1783
1784 pub fn completion_timeout(self, timeout: Duration) -> Flow<In, Out, Mat> {
1785 self.via(Flow::from_runtime_transform(move |input, materializer| {
1786 completion_timeout_stage(input, timeout, materializer)
1787 }))
1788 }
1789
1790 pub fn initial_timeout(self, timeout: Duration) -> Flow<In, Out, Mat> {
1791 self.via(Flow::from_runtime_transform(move |input, materializer| {
1792 initial_timeout_stage(input, timeout, materializer)
1793 }))
1794 }
1795
1796 pub fn keep_alive<Inject>(self, timeout: Duration, inject: Inject) -> Flow<In, Out, Mat>
1797 where
1798 Inject: Fn() -> Out + Send + Sync + 'static,
1799 {
1800 let inject = Arc::new(inject);
1801 self.via(Flow::from_runtime_transform(move |input, materializer| {
1802 keep_alive_stage(input, timeout, Arc::clone(&inject), materializer)
1803 }))
1804 }
1805}
1806
1807impl<Out: Send + 'static, Mat: Send + 'static> Source<Out, Mat> {
1808 pub fn throttle(
1809 self,
1810 elements: u64,
1811 per: Duration,
1812 maximum_burst: i32,
1813 mode: ThrottleMode,
1814 ) -> Self {
1815 self.via(Flow::identity().throttle(elements, per, maximum_burst, mode))
1816 }
1817
1818 pub fn throttle_with_cost<CostFn>(
1819 self,
1820 cost: u64,
1821 per: Duration,
1822 maximum_burst: i32,
1823 cost_fn: CostFn,
1824 mode: ThrottleMode,
1825 ) -> Self
1826 where
1827 CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
1828 {
1829 self.via(Flow::identity().throttle_with_cost(cost, per, maximum_burst, cost_fn, mode))
1830 }
1831
1832 pub fn delay(self, delay: Duration, strategy: DelayOverflowStrategy) -> Self {
1833 self.via(Flow::identity().delay(delay, strategy))
1834 }
1835
1836 pub fn delay_with<Supplier, Strategy>(
1837 self,
1838 delay_strategy_supplier: Supplier,
1839 overflow_strategy: DelayOverflowStrategy,
1840 ) -> Self
1841 where
1842 Supplier: Fn() -> Strategy + Send + Sync + 'static,
1843 Strategy: FnMut(&Out) -> Duration + Send + 'static,
1844 {
1845 self.via(Flow::identity().delay_with(delay_strategy_supplier, overflow_strategy))
1846 }
1847
1848 pub fn initial_delay(self, delay: Duration) -> Self {
1849 self.via(Flow::identity().initial_delay(delay))
1850 }
1851
1852 pub fn grouped_within(self, max_number: usize, interval: Duration) -> Source<Vec<Out>, Mat> {
1853 self.via(Flow::identity().grouped_within(max_number, interval))
1854 }
1855
1856 pub fn grouped_weighted_within<CostFn>(
1857 self,
1858 max_weight: u64,
1859 interval: Duration,
1860 cost_fn: CostFn,
1861 ) -> Source<Vec<Out>, Mat>
1862 where
1863 CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
1864 {
1865 self.via(Flow::identity().grouped_weighted_within(max_weight, interval, cost_fn))
1866 }
1867
1868 pub fn drop_within(self, timeout: Duration) -> Self {
1869 self.via(Flow::identity().drop_within(timeout))
1870 }
1871
1872 pub fn take_within(self, timeout: Duration) -> Self {
1873 self.via(Flow::identity().take_within(timeout))
1874 }
1875
1876 pub fn idle_timeout(self, timeout: Duration) -> Self {
1877 self.via(Flow::identity().idle_timeout(timeout))
1878 }
1879
1880 pub fn backpressure_timeout(self, timeout: Duration) -> Self {
1881 self.via(Flow::identity().backpressure_timeout(timeout))
1882 }
1883
1884 pub fn completion_timeout(self, timeout: Duration) -> Self {
1885 self.via(Flow::identity().completion_timeout(timeout))
1886 }
1887
1888 pub fn initial_timeout(self, timeout: Duration) -> Self {
1889 self.via(Flow::identity().initial_timeout(timeout))
1890 }
1891
1892 pub fn keep_alive<Inject>(self, timeout: Duration, inject: Inject) -> Self
1893 where
1894 Inject: Fn() -> Out + Send + Sync + 'static,
1895 {
1896 self.via(Flow::identity().keep_alive(timeout, inject))
1897 }
1898}
1899
1900impl<Out: Clone + Send + Sync + 'static> Source<Out, Cancellable> {
1901 pub fn tick(initial_delay: Duration, interval: Duration, element: Out) -> Self {
1902 assert!(
1903 interval > Duration::ZERO,
1904 "tick interval must be greater than zero"
1905 );
1906 Source::from_materialized_factory(move |materializer| {
1907 struct TickState {
1908 pending: bool,
1909 closed: bool,
1910 }
1911
1912 let shared = Arc::new((
1913 Mutex::new(TickState {
1914 pending: false,
1915 closed: false,
1916 }),
1917 Condvar::new(),
1918 ));
1919 let keep_alive: Arc<dyn Send + Sync> =
1920 Arc::clone(&materializer.inner) as Arc<dyn Send + Sync>;
1921 let cancellable = Cancellable::new_with_keep_alive(Some(keep_alive));
1922 let cancelled = Arc::clone(&cancellable.cancelled);
1923 let shutdown = Arc::clone(&materializer.inner.state.shutdown);
1924 let shared_task = Arc::clone(&shared);
1925 let timer =
1926 materializer.schedule_with_fixed_delay(initial_delay, interval, move || {
1927 if cancelled.load(Ordering::SeqCst) || shutdown.load(Ordering::SeqCst) {
1928 return;
1929 }
1930 let (lock, condvar) = &*shared_task;
1931 let mut state = lock.lock().unwrap_or_else(|poison| poison.into_inner());
1932 if !state.closed {
1933 state.pending = true;
1934 }
1935 drop(state);
1936 condvar.notify_all();
1937 });
1938
1939 struct TickStream<Out> {
1940 shared: Arc<(Mutex<TickState>, Condvar)>,
1941 timer: Cancellable,
1942 external: Cancellable,
1943 element: Out,
1944 shutdown: Arc<AtomicBool>,
1945 }
1946
1947 impl<Out: Clone> Iterator for TickStream<Out> {
1948 type Item = StreamResult<Out>;
1949
1950 fn next(&mut self) -> Option<Self::Item> {
1951 let (lock, condvar) = &*self.shared;
1952 let mut state = lock.lock().unwrap_or_else(|poison| poison.into_inner());
1953 loop {
1954 if self.external.is_cancelled() || self.shutdown.load(Ordering::SeqCst) {
1955 state.closed = true;
1956 self.timer.cancel();
1957 return None;
1958 }
1959 if state.pending {
1960 state.pending = false;
1961 return Some(Ok(self.element.clone()));
1962 }
1963 let (next, _) = condvar
1964 .wait_timeout(state, WAIT_POLL_INTERVAL)
1965 .unwrap_or_else(|poison| poison.into_inner());
1966 state = next;
1967 }
1968 }
1969 }
1970
1971 impl<Out> Drop for TickStream<Out> {
1972 fn drop(&mut self) {
1973 let (lock, condvar) = &*self.shared;
1974 let mut state = lock.lock().unwrap_or_else(|poison| poison.into_inner());
1975 state.closed = true;
1976 drop(state);
1977 self.timer.cancel();
1978 condvar.notify_all();
1979 }
1980 }
1981
1982 Ok((
1983 Box::new(TickStream {
1984 shared,
1985 timer,
1986 external: cancellable.clone(),
1987 element: element.clone(),
1988 shutdown: Arc::clone(&materializer.inner.state.shutdown),
1989 }) as BoxStream<Out>,
1990 cancellable,
1991 ))
1992 })
1993 }
1994}
1995
1996#[cfg(test)]
1997mod tests {
1998 use super::*;
1999 use crate::testkit::{TestSink, TestSource};
2000 use std::sync::mpsc;
2001 use std::thread;
2002
2003 const LOAD_TIMEOUT: Duration = Duration::from_millis(100);
2004 const LOAD_GAP: Duration = Duration::from_millis(250);
2005
2006 fn materialize_stream<T: Send + 'static, Mat: Send + 'static>(
2007 source: Source<T, Mat>,
2008 ) -> (BoxStream<T>, Materializer, Mat) {
2009 let materializer = Materializer::new();
2010 let (stream, mat) = Arc::clone(&source.factory)
2011 .create(&materializer)
2012 .expect("stream materializes");
2013 (stream, materializer, mat)
2014 }
2015
2016 #[test]
2017 fn throttle_shaping_spaces_elements() {
2018 let (tx, rx) = mpsc::channel();
2019 Source::from_iter(1..=3)
2020 .map(move |_| {
2021 tx.send(Instant::now()).unwrap();
2022 })
2023 .throttle(1, Duration::from_millis(40), 0, ThrottleMode::Shaping)
2024 .run_with(Sink::ignore())
2025 .unwrap()
2026 .wait()
2027 .unwrap();
2028
2029 let first = rx.recv_timeout(Duration::from_millis(250)).unwrap();
2030 let second = rx.recv_timeout(Duration::from_millis(250)).unwrap();
2031 let third = rx.recv_timeout(Duration::from_millis(250)).unwrap();
2032 assert!(second.duration_since(first) >= Duration::from_millis(20));
2033 assert!(third.duration_since(second) >= Duration::from_millis(20));
2034 }
2035
2036 #[test]
2037 fn throttle_enforcing_fails_when_rate_exceeded() {
2038 let result = Source::from_iter(1..=3)
2039 .throttle(1, Duration::from_millis(80), 1, ThrottleMode::Enforcing)
2040 .run_collect();
2041 assert_eq!(
2042 result,
2043 Err(StreamError::Failed(
2044 "Maximum throttle throughput exceeded.".to_owned()
2045 ))
2046 );
2047 }
2048
2049 #[test]
2050 fn delay_delays_first_element_without_reordering() {
2051 let start = Instant::now();
2052 let items = Source::from_iter([1, 2])
2053 .delay(
2054 Duration::from_millis(40),
2055 DelayOverflowStrategy::Backpressure,
2056 )
2057 .run_collect()
2058 .unwrap();
2059 assert_eq!(items, vec![1, 2]);
2060 assert!(start.elapsed() >= Duration::from_millis(20));
2061 }
2062
2063 #[test]
2064 fn delay_emit_early_preserves_elements_when_buffer_not_full() {
2065 let start = Instant::now();
2066 let items = Source::from_iter([1, 2])
2067 .delay(Duration::from_millis(40), DelayOverflowStrategy::EmitEarly)
2068 .run_collect()
2069 .unwrap();
2070 assert_eq!(items, vec![1, 2]);
2071 assert!(start.elapsed() >= Duration::from_millis(20));
2072 }
2073
2074 #[test]
2075 fn delay_emit_early_flushes_oldest_when_buffer_full() {
2076 let delay = Duration::from_secs(2);
2077 let (mut stream, materializer, _mat) = materialize_stream(
2078 Source::from_iter(1..=17).delay(delay, DelayOverflowStrategy::EmitEarly),
2079 );
2080
2081 let start = Instant::now();
2082 assert_eq!(stream.next(), Some(Ok(1)));
2083 assert!(
2084 start.elapsed() < Duration::from_millis(750),
2085 "EmitEarly should flush well before the configured delay under load"
2086 );
2087
2088 let mut items = vec![1];
2089 for item in stream {
2090 items.push(item.unwrap());
2091 }
2092 assert_eq!(items, (1..=17).collect::<Vec<_>>());
2093 materializer.shutdown();
2094 }
2095
2096 #[test]
2097 fn initial_delay_holds_back_first_pull() {
2098 let start = Instant::now();
2099 let result = Source::single(42)
2100 .initial_delay(Duration::from_millis(40))
2101 .run_with(Sink::head())
2102 .unwrap();
2103 assert_eq!(result.wait().unwrap(), 42);
2104 assert!(start.elapsed() >= Duration::from_millis(20));
2105 }
2106
2107 #[test]
2108 fn grouped_within_flushes_on_timer() {
2109 let (publisher, subscriber) = TestSource::probe::<i32>()
2110 .grouped_within(8, Duration::from_millis(40))
2111 .to_mat(TestSink::probe(), Keep::both)
2112 .run()
2113 .unwrap();
2114
2115 publisher.expect_request();
2116 publisher.send_next(1);
2117 publisher.expect_request();
2118 publisher.send_next(2);
2119
2120 subscriber.request(1);
2121 assert_eq!(subscriber.expect_next(), vec![1, 2]);
2122 }
2123
2124 #[test]
2125 fn grouped_weighted_within_flushes_on_weight() {
2126 let result = Source::from_iter(["aa", "bbb", "c"])
2127 .grouped_weighted_within(4, Duration::from_secs(1), |s| s.len() as u64)
2128 .run_collect()
2129 .unwrap();
2130 assert_eq!(result, vec![vec!["aa"], vec!["bbb", "c"]]);
2131 }
2132
2133 #[test]
2134 fn drop_within_drops_early_elements() {
2135 let (publisher, subscriber) = TestSource::probe::<i32>()
2136 .drop_within(Duration::from_millis(40))
2137 .to_mat(TestSink::probe(), Keep::both)
2138 .run()
2139 .unwrap();
2140
2141 publisher.expect_request();
2142 publisher.send_next(1);
2143 thread::sleep(Duration::from_millis(120));
2144 publisher.expect_request();
2145 publisher.send_next(2);
2146 publisher.send_complete();
2147
2148 subscriber.request(2);
2149 assert_eq!(subscriber.expect_next(), 2);
2150 subscriber.expect_complete();
2151 }
2152
2153 #[test]
2154 fn take_within_completes_after_timeout() {
2155 let (publisher, subscriber) = TestSource::probe::<i32>()
2156 .take_within(Duration::from_millis(40))
2157 .to_mat(TestSink::probe(), Keep::both)
2158 .run()
2159 .unwrap();
2160
2161 publisher.expect_request();
2162 publisher.send_next(1);
2163 subscriber.request(2);
2164 assert_eq!(subscriber.expect_next(), 1);
2165 subscriber.expect_complete();
2166 }
2167
2168 #[test]
2169 fn idle_timeout_fails_on_gap() {
2170 let (publisher, subscriber) = TestSource::probe::<i32>()
2171 .idle_timeout(Duration::from_millis(40))
2172 .to_mat(TestSink::probe(), Keep::both)
2173 .run()
2174 .unwrap();
2175
2176 publisher.expect_request();
2177 publisher.send_next(1);
2178 subscriber.request(2);
2179 assert_eq!(subscriber.expect_next(), 1);
2180 assert!(matches!(subscriber.expect_error(), StreamError::Failed(_)));
2181 }
2182
2183 #[test]
2184 fn backpressure_timeout_fails_without_demand() {
2185 let (publisher, subscriber) = TestSource::probe::<i32>()
2186 .backpressure_timeout(LOAD_TIMEOUT)
2187 .to_mat(TestSink::probe(), Keep::both)
2188 .run()
2189 .unwrap();
2190
2191 subscriber.request(1);
2192 publisher.expect_request();
2193 publisher.send_next(1);
2194 assert_eq!(subscriber.expect_next(), 1);
2195 publisher.expect_request();
2196 publisher.send_next(2);
2197 thread::sleep(LOAD_GAP);
2198 subscriber.request(1);
2199 assert!(matches!(subscriber.expect_error(), StreamError::Failed(_)));
2200 }
2201
2202 #[test]
2203 fn completion_timeout_fails_unfinished_stream() {
2204 let result = Source::<i32>::never()
2205 .completion_timeout(Duration::from_millis(40))
2206 .run_collect();
2207 assert!(matches!(result, Err(StreamError::Failed(_))));
2208 }
2209
2210 #[test]
2211 fn initial_timeout_fails_before_first_element() {
2212 let (mut stream, materializer, _mat) =
2213 materialize_stream(Source::<i32>::never().initial_timeout(Duration::from_millis(40)));
2214 assert!(matches!(stream.next(), Some(Err(StreamError::Failed(_)))));
2215 materializer.shutdown();
2216 }
2217
2218 #[test]
2219 fn keep_alive_injects_on_idle_gap() {
2220 let (publisher, subscriber) = TestSource::probe::<i32>()
2221 .keep_alive(LOAD_TIMEOUT, || 0)
2222 .to_mat(TestSink::probe(), Keep::both)
2223 .run()
2224 .unwrap();
2225
2226 subscriber.request(3);
2227 publisher.expect_request();
2228 publisher.send_next(1);
2229 assert_eq!(subscriber.expect_next(), 1);
2230 assert_eq!(subscriber.expect_next(), 0);
2231 }
2232
2233 #[test]
2234 fn keep_alive_rearms_after_slow_consumer_drains_slot() {
2235 let (publisher, subscriber) = TestSource::probe::<i32>()
2236 .keep_alive(LOAD_TIMEOUT, || 0)
2237 .to_mat(TestSink::probe(), Keep::both)
2238 .run()
2239 .unwrap();
2240
2241 subscriber.request(1);
2242 publisher.expect_request();
2243 publisher.send_next(1);
2244 assert_eq!(subscriber.expect_next(), 1);
2245
2246 publisher.expect_request();
2247 publisher.send_next(2);
2248 thread::sleep(LOAD_GAP);
2249
2250 subscriber.request(1);
2251 assert_eq!(subscriber.expect_next(), 2);
2252
2253 thread::sleep(LOAD_GAP);
2254 subscriber.request(1);
2255 assert_eq!(subscriber.expect_next(), 0);
2256 }
2257
2258 #[test]
2259 #[should_panic(expected = "maximum_burst must be -1 or greater")]
2260 fn throttle_rejects_invalid_negative_maximum_burst() {
2261 let _ = Source::single(1)
2262 .throttle(1, Duration::from_millis(40), -2, ThrottleMode::Shaping)
2263 .run_collect();
2264 }
2265
2266 #[test]
2267 fn tick_drops_missed_ticks_and_cancels() {
2268 let (mut stream, materializer, _cancellable) = materialize_stream(Source::tick(
2269 Duration::from_millis(20),
2270 Duration::from_millis(20),
2271 7_i32,
2272 ));
2273
2274 thread::sleep(Duration::from_millis(100));
2275 assert_eq!(stream.next(), Some(Ok(7)));
2276 assert_eq!(stream.next(), Some(Ok(7)));
2277 materializer.shutdown();
2278 assert_eq!(stream.next(), None);
2279 }
2280}