use async_stream::stream;
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures_core::Stream;
use futures_util::pin_mut;
use futures_util::{
future,
stream::{self, BoxStream, FuturesUnordered, StreamExt},
SinkExt,
};
use std::future::Future;
use std::time::Duration;
use std::sync::Arc;
use tokio::{spawn, time::sleep};
use tokio::sync::Mutex;
use crate::error::{StreamError, StreamResult, RetryPolicy};
use crate::stream_performance_metrics::{HealthThresholds, StreamMetrics};
pub type RS2Stream<O> = BoxStream<'static, O>;
#[derive(Debug, Clone, Copy)]
pub enum BackpressureStrategy {
DropOldest,
DropNewest,
Block,
Error,
}
#[derive(Debug, Clone)]
pub struct BackpressureConfig {
pub strategy: BackpressureStrategy,
pub buffer_size: usize,
pub low_watermark: Option<usize>, pub high_watermark: Option<usize>, }
impl Default for BackpressureConfig {
fn default() -> Self {
Self {
strategy: BackpressureStrategy::Block,
buffer_size: 100,
low_watermark: Some(25),
high_watermark: Some(75),
}
}
}
#[derive(Debug, Clone)]
pub enum ExitCase<E> {
Completed,
Errored(E),
}
pub fn emit<O>(item: O) -> RS2Stream<O>
where
O: Send + 'static,
{
stream::once(future::ready(item)).boxed()
}
pub fn empty<O>() -> RS2Stream<O>
where
O: Send + 'static,
{
stream::empty().boxed()
}
pub fn from_iter<I, O>(iter: I) -> RS2Stream<O>
where
I: IntoIterator<Item = O> + Send + 'static,
<I as IntoIterator>::IntoIter: Send,
O: Send + 'static,
{
stream::iter(iter).boxed()
}
pub fn eval<O, F>(fut: F) -> RS2Stream<O>
where
F: Future<Output = O> + Send + 'static,
O: Send + 'static,
{
stream::once(fut).boxed()
}
pub fn repeat<O>(item: O) -> RS2Stream<O>
where
O: Clone + Send + 'static,
{
stream::repeat(item).boxed()
}
pub fn emit_after<O>(item: O, duration: Duration) -> RS2Stream<O>
where
O: Send + 'static,
{
stream::once(async move {
sleep(duration).await;
item
}).boxed()
}
pub fn unfold<S, O, F, Fut>(init: S, mut f: F) -> RS2Stream<O>
where
S: Send + 'static,
O: Send + 'static,
F: FnMut(S) -> Fut + Send + 'static,
Fut: Future<Output = Option<(O, S)>> + Send + 'static,
{
stream! {
let mut state_opt = Some(init);
loop {
let state = state_opt.take().expect("State should be available");
let fut = f(state);
match fut.await {
Some((item, next_state)) => {
yield item;
state_opt = Some(next_state);
},
None => break,
}
}
}
.boxed()
}
pub fn group_adjacent_by<O, K, F>(s: RS2Stream<O>, mut key_fn: F) -> RS2Stream<(K, Vec<O>)>
where
O: Clone + Send + 'static,
K: Eq + Clone + Send + 'static,
F: FnMut(&O) -> K + Send + 'static,
{
stream! {
pin_mut!(s);
let mut current_key: Option<K> = None;
let mut current_group: Vec<O> = Vec::new();
while let Some(item) = s.next().await {
let key = key_fn(&item);
match ¤t_key {
Some(k) if *k == key => {
current_group.push(item);
},
_ => {
if !current_group.is_empty() {
yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
}
current_key = Some(key);
current_group.push(item);
}
}
}
if !current_group.is_empty() {
yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
}
}
.boxed()
}
pub fn take<O>(s: RS2Stream<O>, n: usize) -> RS2Stream<O>
where
O: Send + 'static,
{
s.take(n).boxed()
}
pub fn drop<O>(s: RS2Stream<O>, n: usize) -> RS2Stream<O>
where
O: Send + 'static,
{
s.skip(n).boxed()
}
pub fn chunk<O>(s: RS2Stream<O>, size: usize) -> RS2Stream<Vec<O>>
where
O: Send + 'static,
{
stream! {
let mut buf = Vec::with_capacity(size);
pin_mut!(s);
while let Some(item) = s.next().await {
buf.push(item);
if buf.len() == size {
yield std::mem::take(&mut buf);
}
}
if !buf.is_empty() {
yield std::mem::take(&mut buf);
}
}
.boxed()
}
pub fn timeout<T>(s: RS2Stream<T>, duration: Duration) -> RS2Stream<StreamResult<T>>
where
T: Send + 'static,
{
stream! {
pin_mut!(s);
loop {
match tokio::time::timeout(duration, s.next()).await {
Ok(Some(value)) => yield Ok(value),
Ok(None) => break,
Err(_) => yield Err(StreamError::Timeout),
}
}
}.boxed()
}
pub fn scan<T, U, F>(s: RS2Stream<T>, init: U, mut f: F) -> RS2Stream<U>
where
F: FnMut(U, T) -> U + Send + 'static,
T: Send + 'static,
U: Clone + Send + 'static,
{
stream! {
let mut acc = init;
pin_mut!(s);
while let Some(item) = s.next().await {
acc = f(acc.clone(), item);
yield acc.clone();
}
}.boxed()
}
pub fn fold<T, A, F, Fut>(s: RS2Stream<T>, init: A, mut f: F) -> impl Future<Output = A>
where
F: FnMut(A, T) -> Fut + Send + 'static,
Fut: Future<Output = A> + Send + 'static,
T: Send + 'static,
A: Send + 'static,
{
async move {
let mut acc = init;
pin_mut!(s);
while let Some(item) = s.next().await {
acc = f(acc, item).await;
}
acc
}
}
pub fn reduce<T, F, Fut>(s: RS2Stream<T>, mut f: F) -> impl Future<Output = Option<T>>
where
F: FnMut(T, T) -> Fut + Send + 'static,
Fut: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
async move {
pin_mut!(s);
let first = match s.next().await {
Some(item) => item,
None => return None, };
let mut acc = first;
while let Some(item) = s.next().await {
acc = f(acc, item).await;
}
Some(acc)
}
}
pub fn filter_map<T, U, F, Fut>(s: RS2Stream<T>, f: F) -> RS2Stream<U>
where
F: FnMut(T) -> Fut + Send + 'static,
Fut: Future<Output = Option<U>> + Send + 'static,
T: Send + 'static,
U: Send + 'static,
{
s.filter_map(f).boxed()
}
pub fn take_while<T, F, Fut>(s: RS2Stream<T>, mut predicate: F) -> RS2Stream<T>
where
F: FnMut(&T) -> Fut + Send + 'static,
Fut: Future<Output = bool> + Send + 'static,
T: Send + 'static,
{
stream! {
pin_mut!(s);
while let Some(item) = s.next().await {
if predicate(&item).await {
yield item;
} else {
break;
}
}
}.boxed()
}
pub fn drop_while<T, F, Fut>(s: RS2Stream<T>, mut predicate: F) -> RS2Stream<T>
where
F: FnMut(&T) -> Fut + Send + 'static,
Fut: Future<Output = bool> + Send + 'static,
T: Send + 'static,
{
stream! {
pin_mut!(s);
let mut found_false = false;
while let Some(item) = s.next().await {
if !found_false && predicate(&item).await {
continue;
} else {
found_false = true;
yield item;
}
}
}.boxed()
}
pub fn group_by<T, K, F>(s: RS2Stream<T>, mut key_fn: F) -> RS2Stream<(K, Vec<T>)>
where
T: Clone + Send + 'static,
K: Eq + Clone + Send + 'static,
F: FnMut(&T) -> K + Send + 'static,
{
stream! {
pin_mut!(s);
let mut current_key: Option<K> = None;
let mut current_group: Vec<T> = Vec::new();
while let Some(item) = s.next().await {
let key = key_fn(&item);
match ¤t_key {
Some(k) if *k == key => {
current_group.push(item);
},
_ => {
if !current_group.is_empty() {
yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
}
current_key = Some(key);
current_group.push(item);
}
}
}
if !current_group.is_empty() {
yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
}
}
.boxed()
}
pub fn sliding_window<T>(s: RS2Stream<T>, size: usize) -> RS2Stream<Vec<T>>
where
T: Clone + Send + 'static,
{
if size == 0 {
return empty();
}
stream! {
let mut window = Vec::with_capacity(size);
pin_mut!(s);
while let Some(item) = s.next().await {
window.push(item);
if window.len() > size {
window.remove(0);
}
if window.len() == size {
yield window.clone();
}
}
}.boxed()
}
pub fn batch_process<T, U, F>(
s: RS2Stream<T>,
batch_size: usize,
mut processor: F
) -> RS2Stream<U>
where
F: FnMut(Vec<T>) -> Vec<U> + Send + 'static,
T: Send + 'static,
U: Send + 'static,
{
stream! {
let chunked = chunk(s, batch_size);
pin_mut!(chunked);
while let Some(batch) = chunked.next().await {
for item in processor(batch) {
yield item;
}
}
}.boxed()
}
pub fn with_metrics<T>(
s: RS2Stream<T>,
name: String,
thresholds: HealthThresholds
) -> (RS2Stream<T>, Arc<Mutex<StreamMetrics>>)
where
T: Send + 'static,
{
let metrics = Arc::new(Mutex::new(
StreamMetrics::new()
.with_name(name)
.with_health_thresholds(thresholds)
));
let metrics_clone = Arc::clone(&metrics);
let monitored_stream = stream! {
pin_mut!(s);
while let Some(item) = s.next().await {
{
let mut m = metrics_clone.lock().await;
m.record_item(size_of_val(&item) as u64);
}
yield item;
}
{
let mut m = metrics_clone.lock().await;
m.finalize();
}
}.boxed();
(monitored_stream, metrics)
}
pub fn auto_backpressure<O>(s: RS2Stream<O>, config: BackpressureConfig) -> RS2Stream<O>
where
O: Send + 'static,
{
match config.strategy {
BackpressureStrategy::Block => auto_backpressure_block(s, config.buffer_size),
BackpressureStrategy::DropOldest => auto_backpressure_drop_oldest(s, config.buffer_size),
BackpressureStrategy::DropNewest => auto_backpressure_drop_newest(s, config.buffer_size),
BackpressureStrategy::Error => auto_backpressure_error(s, config.buffer_size),
}
}
pub fn auto_backpressure_block<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
where
O: Send + 'static,
{
let (mut tx, rx): (Sender<O>, Receiver<O>) = channel(buffer_size);
spawn(async move {
pin_mut!(s);
while let Some(item) = s.next().await {
if tx.send(item).await.is_err() {
break;
}
}
});
stream! {
let mut rx = rx;
while let Some(item) = rx.next().await {
yield item;
}
}
.boxed()
}
pub fn auto_backpressure_drop_oldest<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
where
O: Send + 'static,
{
use std::collections::VecDeque;
let buffer = Arc::new(Mutex::new(VecDeque::<O>::new()));
let buffer_clone = Arc::clone(&buffer);
let (done_tx, mut done_rx) = tokio::sync::mpsc::channel(1);
spawn(async move {
pin_mut!(s);
while let Some(item) = s.next().await {
let mut buf = buffer_clone.lock().await;
if buf.len() >= buffer_size {
buf.pop_front();
}
buf.push_back(item);
}
let _ = done_tx.send(()).await;
});
stream! {
let mut source_done = false;
loop {
if let Ok(_) = done_rx.try_recv() {
source_done = true;
}
let item = {
let mut buf = buffer.lock().await;
buf.pop_front()
};
match item {
Some(item) => yield item,
None => {
if source_done {
break;
}
tokio::time::sleep(Duration::from_millis(1)).await;
}
}
}
}
.boxed()
}
pub fn auto_backpressure_drop_newest<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
where
O: Send + 'static,
{
use std::collections::VecDeque;
let buffer = Arc::new(Mutex::new(VecDeque::<O>::new()));
let buffer_clone = Arc::clone(&buffer);
let (done_tx, mut done_rx) = tokio::sync::mpsc::channel(1);
spawn(async move {
pin_mut!(s);
while let Some(item) = s.next().await {
let mut buf = buffer_clone.lock().await;
if buf.len() < buffer_size {
buf.push_back(item);
}
}
let _ = done_tx.send(()).await;
});
stream! {
let mut source_done = false;
loop {
if let Ok(_) = done_rx.try_recv() {
source_done = true;
}
let item = {
let mut buf = buffer.lock().await;
buf.pop_front()
};
match item {
Some(item) => yield item,
None => {
if source_done {
break;
}
tokio::time::sleep(Duration::from_millis(1)).await;
}
}
}
}
.boxed()
}
pub fn auto_backpressure_error<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
where
O: Send + 'static,
{
use tokio::sync::mpsc;
let (tx, mut rx) = mpsc::channel(buffer_size);
spawn(async move {
pin_mut!(s);
while let Some(item) = s.next().await {
if tx.send(item).await.is_err() {
break;
}
}
});
stream! {
while let Some(item) = rx.recv().await {
yield item;
}
}
.boxed()
}
pub fn interrupt_when<O, F>(s: RS2Stream<O>, signal: F) -> RS2Stream<O>
where
O: Send + 'static,
F: Future<Output = ()> + Send + 'static,
{
stream! {
pin_mut!(s);
pin_mut!(signal);
loop {
tokio::select! {
biased;
_ = &mut signal => {
break;
},
maybe_item = s.next() => {
match maybe_item {
Some(item) => yield item,
None => break,
}
},
}
}
}
.boxed()
}
pub fn concat<O, S>(streams: Vec<S>) -> RS2Stream<O>
where
S: Stream<Item = O> + Send + 'static,
O: Send + 'static,
{
stream! {
for s in streams {
pin_mut!(s);
while let Some(item) = s.next().await {
yield item;
}
}
}
.boxed()
}
pub fn merge<O, S1, S2>(s1: S1, mut s2: S2) -> RS2Stream<O>
where
S1: Stream<Item = O> + Send + 'static,
S2: Stream<Item = O> + Send + 'static + Unpin,
O: Send + 'static,
{
let chained = s1
.map(Some)
.chain(stream! { while let Some(x) = s2.next().await { yield Some(x) } });
stream! {
pin_mut!(chained);
while let Some(item) = chained.next().await {
if let Some(x) = item {
yield x;
}
}
}
.boxed()
}
pub fn interleave<O, S>(streams: Vec<S>) -> RS2Stream<O>
where
S: Stream<Item = O> + Send + 'static + Unpin,
O: Send + 'static,
{
if streams.is_empty() {
return empty();
}
stream! {
let mut streams: Vec<_> = streams.into_iter().map(|s| Box::pin(s)).collect();
let mut index = 0;
while !streams.is_empty() {
if index >= streams.len() {
index = 0;
}
match streams[index].next().await {
Some(item) => {
yield item;
index += 1;
}
None => {
streams.remove(index);
}
}
}
}
.boxed()
}
pub fn zip_with<A, B, O, F, S1, S2>(s1: S1, s2: S2, mut f: F) -> RS2Stream<O>
where
S1: Stream<Item = A> + Send + 'static,
S2: Stream<Item = B> + Send + 'static,
F: FnMut(A, B) -> O + Send + 'static,
A: Send + 'static,
B: Send + 'static,
O: Send + 'static,
{
stream! {
pin_mut!(s1);
pin_mut!(s2);
loop {
match futures_util::future::join(s1.next(), s2.next()).await {
(Some(a), Some(b)) => yield f(a, b),
_ => break, }
}
}
.boxed()
}
pub fn either<O, S1, S2>(s1: S1, s2: S2) -> RS2Stream<O>
where
S1: Stream<Item = O> + Send + 'static,
S2: Stream<Item = O> + Send + 'static,
O: Send + 'static,
{
stream! {
pin_mut!(s1);
pin_mut!(s2);
let mut s1_done = false;
let mut s2_done = false;
let mut using_s1 = true;
loop {
if s1_done {
match s2.next().await {
Some(item) => yield item,
None => break,
}
continue;
}
if s2_done {
match s1.next().await {
Some(item) => yield item,
None => break,
}
continue;
}
if using_s1 {
match s1.next().await {
Some(item) => {
yield item;
},
None => {
s1_done = true;
}
}
} else {
match s2.next().await {
Some(item) => {
yield item;
},
None => {
s2_done = true;
}
}
}
tokio::select! {
biased;
maybe_item = s1.next() => {
match maybe_item {
Some(item) => {
yield item;
using_s1 = true;
},
None => {
s1_done = true;
}
}
},
maybe_item = s2.next() => {
match maybe_item {
Some(item) => {
yield item;
using_s1 = false;
},
None => {
s2_done = true;
}
}
}
}
}
}
.boxed()
}
pub fn debounce<O>(s: RS2Stream<O>, duration: Duration) -> RS2Stream<O>
where
O: Send + 'static,
{
stream! {
pin_mut!(s);
let mut latest_item: Option<O> = None;
let mut timer_handle: Option<tokio::task::JoinHandle<()>> = None;
let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
loop {
tokio::select! {
maybe_item = s.next() => {
match maybe_item {
Some(item) => {
if let Some(handle) = timer_handle.take() {
handle.abort();
}
latest_item = Some(item);
let tx_clone = tx.clone();
timer_handle = Some(tokio::spawn(async move {
tokio::time::sleep(duration).await;
let _ = tx_clone.send(()).await;
}));
},
None => {
if let Some(item) = latest_item.take() {
yield item;
}
break;
}
}
},
_ = rx.recv() => {
if let Some(item) = latest_item.take() {
yield item;
}
}
}
}
}
.boxed()
}
pub fn distinct_until_changed<O>(s: RS2Stream<O>) -> RS2Stream<O>
where
O: Clone + Send + PartialEq + 'static,
{
stream! {
pin_mut!(s);
let mut prev: Option<O> = None;
while let Some(item) = s.next().await {
match &prev {
Some(p) if p == &item => {
},
_ => {
yield item.clone();
prev = Some(item);
}
}
}
}
.boxed()
}
pub fn sample<O>(s: RS2Stream<O>, interval: Duration) -> RS2Stream<O>
where
O: Clone + Send + 'static,
{
stream! {
pin_mut!(s);
let mut latest_item: Option<O> = None;
let mut has_new_value = false;
let mut timer = tokio::time::interval(interval);
timer.tick().await;
loop {
tokio::select! {
maybe_item = s.next() => {
match maybe_item {
Some(item) => {
latest_item = Some(item);
has_new_value = true;
},
None => {
if has_new_value {
if let Some(item) = latest_item.take() {
yield item;
}
}
break;
}
}
},
_ = timer.tick() => {
if has_new_value {
if let Some(ref item) = latest_item {
yield item.clone();
has_new_value = false;
}
}
}
}
}
}
.boxed()
}
pub fn distinct_until_changed_by<O, F>(s: RS2Stream<O>, mut eq: F) -> RS2Stream<O>
where
O: Clone + Send + 'static,
F: FnMut(&O, &O) -> bool + Send + 'static,
{
stream! {
pin_mut!(s);
let mut prev: Option<O> = None;
while let Some(item) = s.next().await {
match &prev {
Some(p) if eq(p, &item) => {
},
_ => {
yield item.clone();
prev = Some(item);
}
}
}
}
.boxed()
}
pub fn prefetch<O>(s: RS2Stream<O>, prefetch_count: usize) -> RS2Stream<O>
where
O: Send + 'static,
{
if prefetch_count == 0 {
return s;
}
let (mut tx, rx): (Sender<O>, Receiver<O>) = channel(prefetch_count);
spawn(async move {
pin_mut!(s);
while let Some(item) = s.next().await {
if tx.send(item).await.is_err() {
break;
}
}
});
stream! {
let mut rx = rx;
while let Some(item) = rx.next().await {
yield item;
}
}
.boxed()
}
pub fn rate_limit_backpressure<O>(s: RS2Stream<O>, capacity: usize) -> RS2Stream<O>
where
O: Send + 'static,
{
auto_backpressure_block(s, capacity)
}
pub fn throttle<O>(s: RS2Stream<O>, duration: Duration) -> RS2Stream<O>
where
O: Send + 'static,
{
stream! {
pin_mut!(s);
while let Some(item) = s.next().await {
yield item;
sleep(duration).await;
}
}
.boxed()
}
pub fn tick<O>(period: Duration, item: O) -> RS2Stream<O>
where
O: Clone + Send + 'static,
{
stream! {
loop {
yield item.clone();
sleep(period).await;
}
}
.boxed()
}
pub fn par_eval_map<I, O, Fut, F>(s: RS2Stream<I>, concurrency: usize, mut f: F) -> RS2Stream<O>
where
F: FnMut(I) -> Fut + Send + 'static,
Fut: Future<Output = O> + Send + 'static,
O: Send + 'static,
I: Send + 'static,
{
let buffered_stream = auto_backpressure_block(s, concurrency * 2);
stream! {
let mut in_flight = FuturesUnordered::new();
pin_mut!(buffered_stream);
while let Some(item) = buffered_stream.next().await {
in_flight.push(f(item));
if in_flight.len() >= concurrency {
if let Some(res) = in_flight.next().await {
yield res;
}
}
}
while let Some(res) = in_flight.next().await {
yield res;
}
}
.boxed()
}
pub fn par_eval_map_unordered<I, O, Fut, F>(
s: RS2Stream<I>,
concurrency: usize,
f: F,
) -> RS2Stream<O>
where
F: FnMut(I) -> Fut + Send + 'static,
Fut: Future<Output = O> + Send + 'static,
O: Send + 'static,
I: Send + 'static,
{
let buffered_stream = auto_backpressure_block(s, concurrency * 2);
buffered_stream.map(f).buffer_unordered(concurrency).boxed()
}
pub fn par_join<O, S>(
s: RS2Stream<S>,
concurrency: usize,
) -> RS2Stream<O>
where
S: Stream<Item = O> + Send + 'static + Unpin,
O: Send + 'static,
{
let buffered_stream = auto_backpressure_block(s, concurrency * 2);
stream! {
pin_mut!(buffered_stream);
let mut active_streams: Vec<S> = Vec::with_capacity(concurrency);
let mut outer_stream_done = false;
loop {
while active_streams.len() < concurrency && !outer_stream_done {
match buffered_stream.next().await {
Some(inner_stream) => {
active_streams.push(inner_stream);
},
None => {
outer_stream_done = true;
break;
}
}
}
if active_streams.is_empty() && outer_stream_done {
break;
}
let mut i = 0;
while i < active_streams.len() {
match active_streams[i].next().await {
Some(item) => {
yield item;
i += 1;
},
None => {
active_streams.swap_remove(i);
}
}
}
}
}
.boxed()
}
pub fn bracket<A, O, St, FAcq, FUse, FRel, R>(
acquire: FAcq,
use_fn: FUse,
release: FRel,
) -> RS2Stream<O>
where
FAcq: Future<Output = A> + Send + 'static,
FUse: FnOnce(A) -> St + Send + 'static,
St: Stream<Item = O> + Send + 'static,
FRel: FnOnce(A) -> R + Send + 'static,
R: Future<Output = ()> + Send + 'static,
O: Send + 'static,
A: Clone + Send + 'static,
{
stream! {
let resource = acquire.await;
let stream = use_fn(resource.clone());
pin_mut!(stream);
while let Some(item) = stream.next().await {
yield item;
}
release(resource).await;
}
.boxed()
}
pub fn bracket_case<A, O, E, St, FAcq, FUse, FRel, R>(
acquire: FAcq,
use_fn: FUse,
release: FRel,
) -> RS2Stream<Result<O, E>>
where
FAcq: Future<Output = A> + Send + 'static,
FUse: FnOnce(A) -> St + Send + 'static,
St: Stream<Item = Result<O, E>> + Send + 'static,
FRel: FnOnce(A, ExitCase<E>) -> R + Send + 'static,
R: Future<Output = ()> + Send + 'static,
O: Send + 'static,
E: Clone + Send + 'static,
A: Clone + Send + 'static,
{
stream! {
let resource = acquire.await;
let stream = use_fn(resource.clone());
pin_mut!(stream);
while let Some(item) = stream.next().await {
yield item;
}
release(resource, ExitCase::Completed).await;
}
.boxed()
}
pub use crate::rs2_result_stream_ext::RS2ResultStreamExt;
pub use crate::rs2_stream_ext::RS2StreamExt;