use super::active_read::ActiveRead;
use super::connector::{Connection, Connector};
use super::{Client, TonicStreaming};
use crate::error::ReadError;
use crate::google::storage::v2::{BidiReadObjectRequest, BidiReadObjectResponse, ObjectRangeData};
use gaxi::grpc::tonic::{Result as TonicResult, Status};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::mpsc::{Receiver, Sender};
type ReadResult<T> = std::result::Result<T, ReadError>;
type LoopResult<T> = std::result::Result<T, Arc<crate::Error>>;
#[derive(Debug)]
pub struct Worker<C> {
next_range_id: i64,
ranges: Arc<Mutex<HashMap<i64, ActiveRead>>>,
connector: Connector<C>,
}
impl<C> Worker<C> {
pub fn new(connector: Connector<C>, request_ranges: Vec<ActiveRead>) -> Self {
let hash = HashMap::from_iter(
request_ranges
.into_iter()
.enumerate()
.map(|(id, r)| (id as i64, r)),
);
let next_range_id = hash.len() as i64;
let ranges = Arc::new(Mutex::new(hash));
Self {
next_range_id,
ranges,
connector,
}
}
}
impl<C> Worker<C>
where
C: Client + Clone + 'static,
<C as Client>::Stream: TonicStreaming,
{
pub async fn run(
mut self,
connection: Connection<C::Stream>,
mut requests: Receiver<ActiveRead>,
) -> LoopResult<()> {
let mut ranges = Vec::new();
let (mut rx, mut tx) = (connection.rx, connection.tx);
let error = loop {
tokio::select! {
m = rx.next_message() => {
match self.handle_response(m).await {
None => break None,
Some(Err(e)) => break Some(e),
Some(Ok(None)) => {},
Some(Ok(Some(connection))) => {
(rx, tx) = (connection.rx, connection.tx);
}
};
},
r = requests.recv_many(&mut ranges, 16) => {
if r == 0 {
break None;
};
self.insert_ranges(tx.clone(), std::mem::take(&mut ranges)).await;
},
}
};
drop(rx);
drop(tx);
let Some(e) = error else {
return Ok(());
};
while let Some(mut r) = requests.recv().await {
println!("sending error after closed stream: {e:?}");
r.interrupted(e.clone()).await;
}
Err(e)
}
pub async fn handle_response(
&mut self,
message: TonicResult<Option<BidiReadObjectResponse>>,
) -> Option<LoopResult<Option<Connection<C::Stream>>>> {
let response = match message.transpose()? {
Ok(r) => r,
Err(status) => return self.reconnect(status).await,
};
match self.handle_response_success(response).await {
Err(e) => Some(Err(e)),
Ok(_) => Some(Ok(None)),
}
}
pub async fn handle_response_success(
&mut self,
response: BidiReadObjectResponse,
) -> LoopResult<()> {
if let Err(e) = self.handle_ranges(response.object_data_ranges).await {
let error = Arc::new(e);
self.close_readers(error.clone()).await;
return Err(error);
}
Ok(())
}
async fn handle_ranges(&self, data: Vec<ObjectRangeData>) -> crate::Result<()> {
let mut result = Ok(());
for response in data {
if let Err(e) = Self::handle_range_data(self.ranges.clone(), response).await {
result = result.and(Err(e));
}
}
result.map_err(crate::Error::io)
}
async fn reconnect(
&mut self,
status: Status,
) -> Option<LoopResult<Option<Connection<C::Stream>>>> {
let ranges: Vec<_> = self
.ranges
.lock()
.await
.iter()
.map(|(id, r)| r.as_proto(*id))
.collect();
let (response, _headers, connection) = match self.connector.reconnect(status, ranges).await
{
Err(e) => {
let error = Arc::new(e);
self.close_readers(error.clone()).await;
return Some(Err(error));
}
Ok(t) => t,
};
if let Err(e) = self.handle_ranges(response.object_data_ranges).await {
let error = Arc::new(e);
self.close_readers(error.clone()).await;
return Some(Err(error));
}
Some(Ok(Some(connection)))
}
async fn close_readers(&mut self, error: Arc<crate::Error>) {
use futures::StreamExt;
let mut guard = self.ranges.lock().await;
let closing = futures::stream::FuturesUnordered::new();
for (_, active) in guard.iter_mut() {
closing.push(active.interrupted(error.clone()));
}
let _ = closing.count().await;
guard.clear();
}
async fn insert_ranges(&mut self, tx: Sender<BidiReadObjectRequest>, readers: Vec<ActiveRead>) {
let mut ranges = Vec::new();
for r in readers {
let id = self.next_range_id;
self.next_range_id += 1;
let request = r.as_proto(id);
self.ranges.lock().await.insert(id, r);
ranges.push(request);
}
let request = BidiReadObjectRequest {
read_ranges: ranges,
..BidiReadObjectRequest::default()
};
if let Err(e) = tx.send(request).await {
tracing::error!("error sending read range request: {e:?}");
}
}
async fn handle_range_data(
ranges: Arc<Mutex<HashMap<i64, ActiveRead>>>,
response: ObjectRangeData,
) -> ReadResult<()> {
let range = response
.read_range
.ok_or(ReadError::InvalidBidiStreamingReadResponse(
"missing range".into(),
))?;
let handler = if response.range_end {
let mut pending = ranges.lock().await.remove(&range.read_id).ok_or(
ReadError::InvalidBidiStreamingReadResponse(
format!("unknown read id ({})", range.read_id).into(),
),
)?;
pending.handle_data(response.checksummed_data, range, true)?
} else {
let mut guard = ranges.lock().await;
let pending = guard.get_mut(&range.read_id).ok_or(
ReadError::InvalidBidiStreamingReadResponse(
format!("unknown read id ({}", range.read_id).into(),
),
)?;
pending.handle_data(response.checksummed_data, range, false)?
};
handler.send().await;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::super::mocks::{MockTestClient, mock_connector, mock_stream};
use super::super::tests::{proto_range_id, redirect_status};
use super::*;
use crate::google::storage::v2::{
BidiReadHandle, BidiReadObjectSpec, ChecksummedData, Object, ReadRange as ProtoRange,
};
use crate::model_ext::ReadRange;
use crate::storage::bidi::tests::permanent_error;
use gaxi::grpc::tonic::Response as TonicResponse;
use std::error::Error as _;
use test_case::test_case;
#[tokio::test]
async fn run_immediately_closed() -> anyhow::Result<()> {
let (request_tx, _request_rx) = tokio::sync::mpsc::channel(1);
let (response_tx, response_rx) = mock_stream();
let (tx, rx) = tokio::sync::mpsc::channel(1);
let connection = Connection::new(request_tx, response_rx);
let mut mock = MockTestClient::new();
mock.expect_start().never();
let connector = mock_connector(mock);
let worker = Worker::new(connector, Vec::new());
let handle = tokio::spawn(worker.run(connection, rx));
drop(response_tx);
drop(tx);
handle.await??;
Ok(())
}
#[test_case(true)]
#[test_case(false)]
#[tokio::test]
async fn run_bad_response(range_end: bool) -> anyhow::Result<()> {
let (request_tx, _request_rx) = tokio::sync::mpsc::channel(1);
let (response_tx, response_rx) = mock_stream();
let (tx, rx) = tokio::sync::mpsc::channel(1);
let connection = Connection::new(request_tx, response_rx);
let response = BidiReadObjectResponse {
object_data_ranges: vec![ObjectRangeData {
read_range: Some(proto_range_id(0, 100, -123)),
range_end,
..ObjectRangeData::default()
}],
..BidiReadObjectResponse::default()
};
response_tx.send(Ok(response)).await?;
let mut mock = MockTestClient::new();
mock.expect_start().never();
let connector = mock_connector(mock);
let worker = Worker::new(connector, Vec::new());
let handle = tokio::spawn(worker.run(connection, rx));
response_tx.closed().await;
drop(tx);
let err = handle.await?.unwrap_err();
assert!(err.is_transport(), "{err:?}");
let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
assert!(
matches!(source, Some(ReadError::InvalidBidiStreamingReadResponse(_))),
"{err:?}"
);
Ok(())
}
#[tokio::test]
async fn run_reconnect() -> anyhow::Result<()> {
let (request_tx, _request_rx) = tokio::sync::mpsc::channel(1);
let (response_tx, response_rx) = mock_stream();
let (tx, rx) = tokio::sync::mpsc::channel(1);
let connection = Connection::new(request_tx, response_rx);
response_tx
.send(Err(redirect_status("redirect-01")))
.await?;
let mut mock = MockTestClient::new();
let (reconnected_tx, reconnected_rx) = tokio::sync::oneshot::channel();
mock.expect_start().return_once(move |_, _, _, _, _, _| {
let _ = reconnected_tx.send(());
Err(permanent_error())
});
let connector = mock_connector(mock);
let worker = Worker::new(connector, Vec::new());
let handle = tokio::spawn(worker.run(connection, rx));
reconnected_rx.await?;
drop(tx);
let err = handle.await?.unwrap_err();
assert_eq!(err.status(), permanent_error().status());
Ok(())
}
#[tokio::test]
async fn run_stop_on_closed_requests() -> anyhow::Result<()> {
let (request_tx, _request_rx) = tokio::sync::mpsc::channel(1);
let (_response_tx, response_rx) = mock_stream();
let (tx, rx) = tokio::sync::mpsc::channel(1);
let connection = Connection::new(request_tx, response_rx);
let mut mock = MockTestClient::new();
mock.expect_start().never();
let connector = mock_connector(mock);
let worker = Worker::new(connector, Vec::new());
drop(tx);
worker.run(connection, rx).await?;
Ok(())
}
#[tokio::test]
async fn run_partial_read() -> anyhow::Result<()> {
let SuccessfulReadSetup {
join,
response_tx,
request,
mut reader,
tx,
} = set_up_successful_read().await;
let content = bytes::Bytes::from_owner(String::from_iter((0..100).map(|_| 'x')));
let response = BidiReadObjectResponse {
object_data_ranges: vec![ObjectRangeData {
read_range: Some(proto_range_id(100, content.len() as i64, request.read_id)),
checksummed_data: Some(ChecksummedData {
content: content.clone(),
..ChecksummedData::default()
}),
..ObjectRangeData::default()
}],
..BidiReadObjectResponse::default()
};
response_tx.send(Ok(response)).await?;
let got = reader.recv().await;
assert!(matches!(got, Some(Ok(ref b)) if *b == content), "{got:?}");
assert!(reader.is_empty(), "{reader:?}");
drop(tx);
join.await??;
let got = reader.recv().await;
assert!(got.is_none(), "{got:?}");
Ok(())
}
#[tokio::test]
async fn run_full_read() -> anyhow::Result<()> {
let SuccessfulReadSetup {
join,
response_tx,
request,
mut reader,
tx,
} = set_up_successful_read().await;
let content = bytes::Bytes::from_owner(String::from_iter((0..100).map(|_| 'x')));
let response = BidiReadObjectResponse {
object_data_ranges: vec![ObjectRangeData {
read_range: Some(proto_range_id(100, content.len() as i64, request.read_id)),
range_end: true,
checksummed_data: Some(ChecksummedData {
content: content.clone(),
..ChecksummedData::default()
}),
}],
..BidiReadObjectResponse::default()
};
response_tx.send(Ok(response)).await?;
let got = reader.recv().await;
assert!(matches!(got, Some(Ok(ref b)) if *b == content), "{got:?}");
let got = reader.recv().await;
assert!(got.is_none(), "{got:?}");
drop(tx);
join.await??;
Ok(())
}
#[tokio::test]
async fn run_batched_reads() -> anyhow::Result<()> {
let MockSetup {
join,
tx,
response_tx,
mut request_rx,
} = set_up_mock().await;
let reader1 = mock_reader(&tx, ReadRange::segment(100, 42)).await;
let reader2 = mock_reader(&tx, ReadRange::segment(200, 42)).await;
let reader3 = mock_reader(&tx, ReadRange::segment(300, 42)).await;
let request1 = request_rx.recv().await;
let content = bytes::Bytes::from_owner(String::from_iter((0..42).map(|_| 'x')));
let response = BidiReadObjectResponse {
object_data_ranges: (0..3)
.map(|i| ObjectRangeData {
read_range: Some(proto_range_id(100 + i * 100, content.len() as i64, i)),
range_end: true,
checksummed_data: Some(ChecksummedData {
content: content.clone(),
..ChecksummedData::default()
}),
})
.collect(),
..BidiReadObjectResponse::default()
};
response_tx.send(Ok(response)).await?;
for (i, mut r) in [reader1, reader2, reader3].into_iter().enumerate() {
let got = r.recv().await;
assert!(
matches!(got, Some(Ok(ref b)) if *b == content),
"[{i}] {got:?}"
);
let got = r.recv().await;
assert!(got.is_none(), "[{i}] {got:?}");
}
drop(tx);
join.await??;
let request2 = request_rx.recv().await;
let e0 = proto_range_id(100, 42, 0);
let e1 = proto_range_id(200, 42, 1);
let e2 = proto_range_id(300, 42, 2);
match (request1, request2) {
(Some(r), None) if r.read_ranges.len() == 3 => {
assert!(r.read_object_spec.is_none(), "{r:?}");
assert_eq!(r.read_ranges, vec![e0, e1, e2], "{r:?}");
}
(Some(r), Some(n)) if n.read_ranges.len() == 2 => {
assert!(r.read_object_spec.is_none(), "{r:?}");
assert_eq!(r.read_ranges, vec![e0], "{r:?}");
assert!(n.read_object_spec.is_none(), "{n:?}");
assert_eq!(n.read_ranges, vec![e1, e2], "{r:?}");
}
(Some(r), None) => {
panic!("initial request did not have enough messages: {r:?}")
}
(Some(r), Some(n)) => {
panic!("first or second requests have too few messages r={r:?}, n={n:?}")
}
(None, n) => {
panic!("first message is none and second is not {n:?}")
}
}
Ok(())
}
struct SuccessfulReadSetup {
join: tokio::task::JoinHandle<Result<(), Arc<crate::Error>>>,
tx: Sender<ActiveRead>,
reader: Receiver<ReadResult<bytes::Bytes>>,
response_tx: Sender<TonicResult<BidiReadObjectResponse>>,
request: ProtoRange,
}
async fn set_up_successful_read() -> SuccessfulReadSetup {
let MockSetup {
join,
tx,
response_tx,
mut request_rx,
} = set_up_mock().await;
let reader = mock_reader(&tx, ReadRange::segment(100, 100)).await;
let request = request_rx
.recv()
.await
.expect("request queue is not closed");
assert!(request.read_object_spec.is_none(), "{request:?}");
assert_eq!(request.read_ranges.len(), 1, "{request:?}");
let request = request.read_ranges.first().unwrap();
assert_eq!(request.read_offset, 100);
assert_eq!(request.read_length, 100);
SuccessfulReadSetup {
join,
tx,
reader,
response_tx,
request: *request,
}
}
struct MockSetup {
join: tokio::task::JoinHandle<Result<(), Arc<crate::Error>>>,
tx: Sender<ActiveRead>,
response_tx: Sender<TonicResult<BidiReadObjectResponse>>,
request_rx: Receiver<BidiReadObjectRequest>,
}
async fn set_up_mock() -> MockSetup {
let (request_tx, request_rx) = tokio::sync::mpsc::channel(1);
let (response_tx, response_rx) = mock_stream();
let (tx, rx) = tokio::sync::mpsc::channel(16);
let connection = Connection::new(request_tx, response_rx);
let mut mock = MockTestClient::new();
mock.expect_start().never();
let connector = mock_connector(mock);
let worker = Worker::new(connector, Vec::new());
let join = tokio::spawn(async move { worker.run(connection, rx).await });
MockSetup {
join,
tx,
response_tx,
request_rx,
}
}
#[tokio::test]
async fn run_reconnect_with_pending_reads() -> anyhow::Result<()> {
let (request_tx, mut request_rx) = tokio::sync::mpsc::channel(4);
let (response_tx, response_rx) = mock_stream();
let (tx, rx) = tokio::sync::mpsc::channel(4);
let connection = Connection::new(request_tx, response_rx);
let receivers = Arc::new(std::sync::Mutex::new(Vec::new()));
let save = receivers.clone();
let mut mock = MockTestClient::new();
mock.expect_start().return_once(move |_, _, rx, _, _, _| {
save.lock().expect("never poisoned").push(rx);
Err(permanent_error())
});
let connector = mock_connector(mock);
let worker = Worker::new(connector, Vec::new());
let worker = tokio::spawn(async move { worker.run(connection, rx).await });
let mut reader = mock_reader(&tx, ReadRange::tail(100)).await;
let request = request_rx
.recv()
.await
.expect("request queue is not closed");
assert!(request.read_object_spec.is_none(), "{request:?}");
let read_id = request
.read_ranges
.first()
.expect("at least one range")
.read_id;
response_tx
.send(Err(redirect_status("redirect-01")))
.await?;
let got = reader.recv().await;
assert!(
matches!(got, Some(Err(ReadError::UnrecoverableBidiReadInterrupt(ref e))) if e.status() == permanent_error().status()),
"{got:?}"
);
let got = reader.recv().await;
assert!(got.is_none(), "{got:?}");
let mut reconnect_rx = {
let mut guard = receivers.lock().expect("never poisoned");
let rx = guard.pop().expect("at least one receiver");
assert!(guard.is_empty(), "{receivers:?}");
rx
};
let first = reconnect_rx
.recv()
.await
.expect("non-empty request in reconnect");
let want = BidiReadObjectSpec {
bucket: "projects/_/buckets/test-bucket".into(),
object: "test-object".into(),
read_handle: Some(BidiReadHandle {
handle: bytes::Bytes::from_static(b"test-handle-redirect"),
}),
routing_token: Some("redirect-01".into()),
..BidiReadObjectSpec::default()
};
assert_eq!(first.read_object_spec, Some(want), "{first:?}");
assert_eq!(
first
.read_ranges
.first()
.map(|r| (r.read_id, r.read_offset)),
Some((read_id, -100)),
"{first:?}"
);
drop(tx);
let err = worker.await?.unwrap_err();
assert_eq!(err.status(), permanent_error().status());
Ok(())
}
#[tokio::test]
async fn run_reconnect_with_successful_read() -> anyhow::Result<()> {
const LEN: i64 = 42;
let (request_tx, mut request_rx) = tokio::sync::mpsc::channel(4);
let (response_tx, response_rx) = mock_stream();
let (tx, rx) = tokio::sync::mpsc::channel(4);
let connection = Connection::new(request_tx, response_rx);
let receivers = Arc::new(std::sync::Mutex::new(Vec::new()));
let save = receivers.clone();
let (reconnect_tx, reconnect_rx) =
tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(5);
let initial = BidiReadObjectResponse {
metadata: Some(Object {
generation: 123456,
..Object::default()
}),
object_data_ranges: vec![ObjectRangeData {
checksummed_data: Some(ChecksummedData {
content: bytes::Bytes::from_owner(String::from_iter((0..LEN).map(|_| 'x'))),
..ChecksummedData::default()
}),
read_range: Some(ProtoRange {
read_offset: 100,
read_length: LEN,
read_id: 0,
}),
range_end: true,
}],
..BidiReadObjectResponse::default()
};
reconnect_tx.send(Ok(initial)).await?;
let reconnect_stream = TonicResponse::from(reconnect_rx);
let mut mock = MockTestClient::new();
mock.expect_start().return_once(move |_, _, rx, _, _, _| {
save.lock().expect("never poisoned").push(rx);
Ok(Ok(reconnect_stream))
});
let connector = mock_connector(mock);
let worker = Worker::new(connector, Vec::new());
let worker = tokio::spawn(async move { worker.run(connection, rx).await });
let mut reader = mock_reader(&tx, ReadRange::offset(100)).await;
let request = request_rx
.recv()
.await
.expect("request queue is not closed");
assert!(request.read_object_spec.is_none(), "{request:?}");
let read_id = request
.read_ranges
.first()
.expect("at least one range")
.read_id;
response_tx
.send(Err(redirect_status("redirect-01")))
.await?;
let got = reader.recv().await;
assert!(
matches!(got, Some(Ok(ref b)) if b.len() == LEN as usize),
"{got:?}"
);
let mut reconnect_rx = {
let mut guard = receivers.lock().expect("never poisoned");
let rx = guard.pop().expect("at least one receiver");
assert!(guard.is_empty(), "{receivers:?}");
rx
};
let first = reconnect_rx
.recv()
.await
.expect("non-empty request in reconnect");
let want = BidiReadObjectSpec {
bucket: "projects/_/buckets/test-bucket".into(),
object: "test-object".into(),
read_handle: Some(BidiReadHandle {
handle: bytes::Bytes::from_static(b"test-handle-redirect"),
}),
routing_token: Some("redirect-01".into()),
..BidiReadObjectSpec::default()
};
assert_eq!(first.read_object_spec, Some(want), "{first:?}");
assert_eq!(
first
.read_ranges
.first()
.map(|r| (r.read_id, r.read_offset)),
Some((read_id, 100)),
"{first:?}"
);
drop(tx); worker.await??;
Ok(())
}
#[tokio::test]
async fn run_reconnect_with_error_read() -> anyhow::Result<()> {
const LEN: i64 = 42;
let (request_tx, mut request_rx) = tokio::sync::mpsc::channel(4);
let (response_tx, response_rx) = mock_stream();
let (tx, rx) = tokio::sync::mpsc::channel(4);
let connection = Connection::new(request_tx, response_rx);
let receivers = Arc::new(std::sync::Mutex::new(Vec::new()));
let save = receivers.clone();
let (reconnect_tx, reconnect_rx) =
tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(5);
let initial = BidiReadObjectResponse {
metadata: Some(Object {
generation: 123456,
..Object::default()
}),
object_data_ranges: vec![ObjectRangeData {
checksummed_data: Some(ChecksummedData {
content: bytes::Bytes::from_owner(String::from_iter((0..LEN).map(|_| 'x'))),
..ChecksummedData::default()
}),
read_range: Some(ProtoRange {
read_offset: 100,
read_length: LEN,
read_id: -123456, }),
range_end: true,
}],
..BidiReadObjectResponse::default()
};
reconnect_tx.send(Ok(initial)).await?;
let reconnect_stream = TonicResponse::from(reconnect_rx);
let mut mock = MockTestClient::new();
mock.expect_start().return_once(move |_, _, rx, _, _, _| {
save.lock().expect("never poisoned").push(rx);
Ok(Ok(reconnect_stream))
});
let connector = mock_connector(mock);
let worker = Worker::new(connector, Vec::new());
let worker = tokio::spawn(async move { worker.run(connection, rx).await });
let mut reader = mock_reader(&tx, ReadRange::offset(100)).await;
let request = request_rx
.recv()
.await
.expect("request queue is not closed");
assert!(request.read_object_spec.is_none(), "{request:?}");
let read_id = request
.read_ranges
.first()
.expect("at least one range")
.read_id;
response_tx
.send(Err(redirect_status("redirect-01")))
.await?;
let got = reader.recv().await;
assert!(
matches!(got, Some(Err(ReadError::UnrecoverableBidiReadInterrupt(_)))),
"{got:?}"
);
let mut reconnect_rx = {
let mut guard = receivers.lock().expect("never poisoned");
let rx = guard.pop().expect("at least one receiver");
assert!(guard.is_empty(), "{receivers:?}");
rx
};
let first = reconnect_rx
.recv()
.await
.expect("non-empty request in reconnect");
let want = BidiReadObjectSpec {
bucket: "projects/_/buckets/test-bucket".into(),
object: "test-object".into(),
read_handle: Some(BidiReadHandle {
handle: bytes::Bytes::from_static(b"test-handle-redirect"),
}),
routing_token: Some("redirect-01".into()),
..BidiReadObjectSpec::default()
};
assert_eq!(first.read_object_spec, Some(want), "{first:?}");
assert_eq!(
first
.read_ranges
.first()
.map(|r| (r.read_id, r.read_offset)),
Some((read_id, 100)),
"{first:?}"
);
drop(tx); let err = worker.await?.unwrap_err();
assert!(err.is_transport(), "{err:?}");
let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
assert!(
matches!(source, Some(ReadError::InvalidBidiStreamingReadResponse(_))),
"{err:?}"
);
Ok(())
}
async fn mock_reader(
requests: &Sender<ActiveRead>,
range: ReadRange,
) -> Receiver<ReadResult<bytes::Bytes>> {
let (tx, rx) = tokio::sync::mpsc::channel(1);
let reader = ActiveRead::new(tx, range.0);
requests
.send(reader)
.await
.expect("requests queue is not closed in tests");
rx
}
}