mod active_read;
pub(crate) mod connector;
mod normalized_range;
mod range_reader;
mod redirect;
mod remaining_range;
mod requested_range;
mod resume_redirect;
mod retry_redirect;
pub(crate) mod stub;
pub(crate) mod transport;
mod worker;
use crate::google::storage::v2::{BidiReadObjectRequest, BidiReadObjectResponse};
use crate::request_options::RequestOptions;
use gaxi::grpc::tonic::{Extensions, Response as TonicResponse, Result as TonicResult, Streaming};
use tokio::sync::mpsc::Receiver;
pub use super::open_object::OpenObject;
pub trait TonicStreaming: std::fmt::Debug + Send + 'static {
fn next_message(
&mut self,
) -> impl Future<Output = TonicResult<Option<BidiReadObjectResponse>>> + Send;
}
impl TonicStreaming for Streaming<BidiReadObjectResponse> {
async fn next_message(&mut self) -> TonicResult<Option<BidiReadObjectResponse>> {
self.message().await
}
}
pub trait Client: std::fmt::Debug + Send + 'static {
type Stream: Sized;
fn start(
&self,
extensions: Extensions,
path: http::uri::PathAndQuery,
rx: Receiver<BidiReadObjectRequest>,
options: &RequestOptions,
api_client_header: &'static str,
request_params: &str,
) -> impl Future<Output = crate::Result<TonicResult<TonicResponse<Self::Stream>>>> + Send;
}
impl Client for gaxi::grpc::Client {
type Stream = Streaming<BidiReadObjectResponse>;
async fn start(
&self,
extensions: Extensions,
path: http::uri::PathAndQuery,
rx: Receiver<BidiReadObjectRequest>,
options: &RequestOptions,
api_client_header: &'static str,
request_params: &str,
) -> crate::Result<TonicResult<TonicResponse<Self::Stream>>> {
let request = tokio_stream::wrappers::ReceiverStream::new(rx);
self.bidi_stream_with_status(
extensions,
path,
request,
options.gax(),
api_client_header,
request_params,
)
.await
}
}
#[cfg(test)]
mod mocks;
#[cfg(test)]
mod tests {
use crate::Error;
use crate::google::storage::v2::{
BidiReadHandle, BidiReadObjectRedirectedError, ReadRange as ProtoRange,
};
use crate::request_options::RequestOptions;
use gaxi::grpc::tonic::{Code as TonicCode, Status as TonicStatus};
use google_cloud_gax::error::rpc::{Code, Status};
use prost::Message as _;
use std::sync::Arc;
pub(super) fn redirect_handle() -> BidiReadHandle {
BidiReadHandle {
handle: bytes::Bytes::from_static(b"test-handle-redirect"),
}
}
pub(super) fn redirect_status(routing: &str) -> TonicStatus {
use crate::google::rpc::Status as RpcStatus;
let redirect = BidiReadObjectRedirectedError {
routing_token: Some(routing.to_string()),
read_handle: Some(redirect_handle()),
};
let redirect = prost_types::Any::from_msg(&redirect).unwrap();
let status = RpcStatus {
code: Code::Aborted as i32,
message: "redirect".to_string(),
details: vec![redirect],
};
let details = bytes::Bytes::from_owner(status.encode_to_vec());
TonicStatus::with_details(TonicCode::Aborted, "redirect", details)
}
pub(super) fn redirect_error(routing: &str) -> Error {
gaxi::grpc::from_status::to_gax_error(redirect_status(routing))
}
pub(super) fn permanent_error() -> Error {
Error::service(
Status::default()
.set_code(Code::PermissionDenied)
.set_message("uh-oh"),
)
}
pub(super) fn transient_error() -> Error {
Error::service(
Status::default()
.set_code(Code::Unavailable)
.set_message("try-again"),
)
}
pub(super) fn test_options() -> RequestOptions {
let mut options = RequestOptions::new();
options.backoff_policy = Arc::new(test_backoff());
options
}
fn test_backoff() -> impl google_cloud_gax::backoff_policy::BackoffPolicy {
use std::time::Duration;
google_cloud_gax::exponential_backoff::ExponentialBackoffBuilder::new()
.with_initial_delay(Duration::from_micros(1))
.with_maximum_delay(Duration::from_micros(1))
.build()
.expect("a valid backoff policy")
}
pub(super) fn proto_range(offset: i64, length: i64) -> ProtoRange {
ProtoRange {
read_offset: offset,
read_length: length,
..ProtoRange::default()
}
}
pub(super) fn proto_range_id(offset: i64, length: i64, id: i64) -> ProtoRange {
let mut range = proto_range(offset, length);
range.read_id = id;
range
}
}