use super::redirect::handle_redirect;
use super::retry_redirect::RetryRedirect;
use super::{Client, TonicStreaming};
use crate::google::storage::v2::{
BidiReadObjectRequest, BidiReadObjectResponse, BidiReadObjectSpec, ReadRange as ProtoRange,
};
use crate::read_resume_policy::{ResumeQuery, ResumeResult};
use crate::request_options::RequestOptions;
use crate::storage::bidi::resume_redirect::ResumeRedirect;
use crate::storage::info::X_GOOG_API_CLIENT_HEADER;
use crate::{Error, Result};
use gaxi::grpc::Client as GrpcClient;
use gaxi::grpc::tonic::{Extensions, GrpcMethod, Status, Streaming};
use http::HeaderMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::mpsc::Sender;
#[derive(Debug)]
pub struct Connection<S = Streaming<BidiReadObjectResponse>> {
pub tx: Sender<BidiReadObjectRequest>,
pub rx: S,
}
impl<S> Connection<S> {
pub fn new(tx: Sender<BidiReadObjectRequest>, rx: S) -> Self {
Self { tx, rx }
}
}
#[derive(Clone, Debug)]
pub struct Connector<T = GrpcClient> {
spec: Arc<Mutex<BidiReadObjectSpec>>,
options: RequestOptions,
client: T,
reconnect_attempts: u32,
}
impl<T> Connector<T>
where
T: Client + Clone + Send + 'static,
<T as Client>::Stream: TonicStreaming,
{
pub fn new(spec: BidiReadObjectSpec, options: RequestOptions, client: T) -> Self {
Self {
spec: Arc::new(Mutex::new(spec)),
options,
client,
reconnect_attempts: 0_u32,
}
}
pub async fn connect(
&mut self,
ranges: Vec<ProtoRange>,
) -> Result<(BidiReadObjectResponse, HeaderMap, Connection<T::Stream>)> {
let throttler = self.options.retry_throttler.clone();
let retry = Arc::new(RetryRedirect::new(self.options.retry_policy.clone()));
let backoff = self.options.backoff_policy.clone();
let client = self.client.clone();
let options = self.options.clone();
let spec = self.spec.clone();
let sleep = async |backoff| tokio::time::sleep(backoff).await;
let default_timeout = self.options.bidi_attempt_timeout;
let inner = async move |d: Option<Duration>| {
let attempt_timeout = std::cmp::min(default_timeout, d.unwrap_or(default_timeout));
let attempt =
Self::connect_attempt(client.clone(), spec.clone(), ranges.clone(), &options);
match tokio::time::timeout(attempt_timeout, attempt).await {
Ok(r) => r,
Err(e) => Err(Error::timeout(e)),
}
};
google_cloud_gax::retry_loop_internal::retry_loop(
inner, sleep, true, throttler, retry, backoff,
)
.await
}
pub async fn reconnect(
&mut self,
status: Status,
ranges: Vec<ProtoRange>,
) -> Result<(BidiReadObjectResponse, HeaderMap, Connection<T::Stream>)> {
use crate::read_resume_policy::ReadResumePolicy;
let error = handle_redirect(self.spec.clone(), status);
self.reconnect_attempts += 1;
let policy = ResumeRedirect::new(self.options.read_resume_policy());
match policy.on_error(&ResumeQuery::new(self.reconnect_attempts), error) {
ResumeResult::Continue(_) => self.connect(ranges).await,
ResumeResult::Exhausted(e) => Err(e),
ResumeResult::Permanent(e) => Err(e),
}
}
async fn connect_attempt(
client: T,
spec: Arc<Mutex<BidiReadObjectSpec>>,
ranges: Vec<ProtoRange>,
options: &RequestOptions,
) -> Result<(BidiReadObjectResponse, HeaderMap, Connection<T::Stream>)> {
let request = BidiReadObjectRequest {
read_object_spec: Some((*spec.lock().expect("never poisoned")).clone()),
read_ranges: ranges,
};
let bucket_name = request
.read_object_spec
.as_ref()
.map(|s| s.bucket.as_str())
.unwrap_or_default();
if bucket_name
.strip_prefix("projects/_/buckets/")
.is_none_or(|x| x.is_empty())
{
use google_cloud_gax::error::binding::*;
let problem = SubstitutionFail::MismatchExpecting(
bucket_name.to_string(),
"projects/_/buckets/*",
);
let mismatch = SubstitutionMismatch {
field_name: "bucket",
problem,
};
let mismatch = PathMismatch {
subs: vec![mismatch],
};
let mismatch = BindingError {
paths: vec![mismatch],
};
return Err(crate::Error::binding(mismatch));
}
let x_goog_request_params = request
.read_object_spec
.iter()
.flat_map(|s| s.routing_token.iter())
.fold(format!("bucket={bucket_name}"), |s, token| {
s + &format!("&routing_token={token}")
});
let (tx, rx) = tokio::sync::mpsc::channel::<BidiReadObjectRequest>(100);
tx.send(request.clone()).await.map_err(Error::io)?;
let extensions = {
let mut e = Extensions::new();
e.insert(GrpcMethod::new(
"google.storage.v2.Storage",
"BidiReadObject",
));
e
};
let path =
http::uri::PathAndQuery::from_static("/google.storage.v2.Storage/BidiReadObject");
let response = client
.start(
extensions,
path,
rx,
options,
&X_GOOG_API_CLIENT_HEADER,
&x_goog_request_params,
)
.await?;
let response = match response {
Ok(r) => r,
Err(status) => return Err(handle_redirect(spec, status)),
};
let (metadata, mut stream, _) = response.into_parts();
let headers = metadata.into_headers();
match stream.next_message().await {
Ok(Some(m)) => {
let mut guard = spec.lock().expect("never poisoned");
if let Some(generation) = m.metadata.as_ref().map(|o| o.generation) {
guard.generation = generation;
}
if m.read_handle.is_some() {
guard.read_handle = m.read_handle.clone();
}
Ok((m, headers, Connection::new(tx, stream)))
}
Ok(None) => Err(Error::io("bidi_read_object stream closed before start")),
Err(status) => Err(handle_redirect(spec, status)),
}
}
}
#[cfg(test)]
mod tests {
use super::super::mocks::{MockTestClient, SharedMockClient};
use super::super::tests::{permanent_error, redirect_handle, redirect_status, test_options};
use super::*;
use crate::google::storage::v2::{BidiReadHandle, Object, ObjectRangeData};
use crate::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
use anyhow::Result;
use gaxi::grpc::tonic::{Response as TonicResponse, Result as TonicResult};
use google_cloud_auth::credentials::{Credentials, anonymous::Builder as Anonymous};
use google_cloud_gax::error::binding::{BindingError, SubstitutionFail};
use google_cloud_gax::retry_policy::NeverRetry;
use static_assertions::assert_impl_all;
use std::error::Error as _;
use std::sync::Arc;
use test_case::test_case;
#[test]
fn assertions() {
assert_impl_all!(Connector: Clone, std::fmt::Debug, Send, Sync);
}
#[tokio::test]
async fn bad_endpoint() -> Result<()> {
fn need_send<T: Send>(_val: &T) {}
let mut config = gaxi::options::ClientConfig::default();
config.cred = Some(test_credentials());
let client = GrpcClient::new(config, "http://127.0.0.1:1").await?;
let spec = BidiReadObjectSpec {
bucket: "projects/_/buckets/test-only-bucket".into(),
object: "test-only-object".into(),
..BidiReadObjectSpec::default()
};
let mut options = test_options();
options.retry_policy = Arc::new(NeverRetry);
let mut connector = Connector::new(spec, options, client);
let start = connector.connect(Vec::new());
need_send(&start);
let err = start.await.unwrap_err();
assert!(err.is_connect(), "{err:?}");
Ok(())
}
#[tokio::test]
#[test_case("")]
#[test_case("my-bucket")]
async fn binding_error(bucket_name: &str) -> Result<()> {
let mut mock = MockTestClient::new();
mock.expect_start().never();
let client = SharedMockClient::new(mock);
let spec = BidiReadObjectSpec {
bucket: bucket_name.to_string(),
object: "object".into(),
..BidiReadObjectSpec::default()
};
let mut connector = Connector::new(spec, test_options(), client);
let err = connector.connect(Vec::new()).await.unwrap_err();
assert!(err.is_binding(), "{err:?}");
let source = err.source().and_then(|e| e.downcast_ref::<BindingError>());
assert!(matches!(source, Some(BindingError { .. })), "{err:?}");
let mismatch = source
.iter()
.flat_map(|f| f.paths.iter())
.flat_map(|f| f.subs.iter())
.map(|f| f.field_name)
.collect::<Vec<_>>();
assert_eq!(mismatch, vec!["bucket"], "{err:?}");
let mismatch = source
.iter()
.flat_map(|f| f.paths.iter())
.flat_map(|f| f.subs.iter())
.map(|f| &f.problem)
.collect::<Vec<_>>();
assert!(
matches!(
mismatch.first(),
Some(SubstitutionFail::MismatchExpecting(n, p)) if n == bucket_name && *p == "projects/_/buckets/*"
),
"{err:?}"
);
Ok(())
}
#[tokio::test]
async fn start_error() -> Result<()> {
let ranges = vec![
ProtoRange {
read_id: 123,
read_offset: 100,
read_length: 200,
},
ProtoRange {
read_id: 234,
read_offset: 500,
read_length: 100,
},
];
let receivers = Arc::new(Mutex::new(Vec::new()));
let save = receivers.clone();
let mut mock = MockTestClient::new();
mock.expect_start()
.return_once(move |extensions, path, rx, _options, header, params| {
assert!(
matches!(extensions.get::<GrpcMethod>(), Some(m) if m.service() == "google.storage.v2.Storage" && m.method() == "BidiReadObject")
);
assert_eq!(path.path(), "/google.storage.v2.Storage/BidiReadObject");
assert_eq!(header, *X_GOOG_API_CLIENT_HEADER);
assert_eq!(params, "bucket=projects/_/buckets/test-bucket");
save.lock().expect("never poisoned").push(rx);
Err(permanent_error())
});
let client = SharedMockClient::new(mock);
let spec = BidiReadObjectSpec {
bucket: "projects/_/buckets/test-bucket".into(),
object: "test-object".into(),
..BidiReadObjectSpec::default()
};
let mut connector = Connector::new(spec, test_options(), client);
let err = connector.connect(ranges.clone()).await.unwrap_err();
assert!(err.status().is_some(), "{err:?}");
let mut rx = {
let mut guard = receivers.lock().expect("never poisoned");
let rx = guard.pop().expect("at least one receiver");
assert!(guard.is_empty(), "{receivers:?}");
rx
};
let first = rx.recv().await.expect("non-empty request");
assert_eq!(
first.read_object_spec.as_ref().map(|s| s.bucket.as_str()),
Some("projects/_/buckets/test-bucket")
);
assert_eq!(
first.read_object_spec.as_ref().map(|s| s.object.as_str()),
Some("test-object")
);
assert_eq!(first.read_ranges, ranges);
Ok(())
}
#[tokio::test]
async fn start_error_with_routing() -> Result<()> {
let ranges = vec![
ProtoRange {
read_id: 123,
read_offset: 100,
read_length: 200,
},
ProtoRange {
read_id: 234,
read_offset: 500,
read_length: 100,
},
];
let receivers = Arc::new(Mutex::new(Vec::new()));
let save = receivers.clone();
let mut mock = MockTestClient::new();
mock.expect_start()
.return_once(move |extensions, path, rx, _options, header, params| {
assert!(
matches!(
extensions.get::<GrpcMethod>(),
Some(m) if m.service() == "google.storage.v2.Storage" && m.method() == "BidiReadObject"
)
);
assert_eq!(path.path(), "/google.storage.v2.Storage/BidiReadObject");
assert_eq!(header, *X_GOOG_API_CLIENT_HEADER);
let mut split = params.split('&').collect::<Vec<_>>();
split.sort();
assert_eq!(split, vec!["bucket=projects/_/buckets/test-bucket", "routing_token=test-routing-token"]);
save.lock().expect("never poisoned").push(rx);
Err(permanent_error())
});
let client = SharedMockClient::new(mock);
let spec = BidiReadObjectSpec {
bucket: "projects/_/buckets/test-bucket".into(),
object: "test-object".into(),
generation: 345678,
read_handle: Some(BidiReadHandle {
handle: bytes::Bytes::from_static(b"test-handle"),
}),
routing_token: Some("test-routing-token".to_string()),
..BidiReadObjectSpec::default()
};
let mut connector = Connector::new(spec, test_options(), client);
let err = connector.connect(ranges.clone()).await.unwrap_err();
assert!(err.status().is_some(), "{err:?}");
let mut rx = {
let mut guard = receivers.lock().expect("never poisoned");
let rx = guard.pop().expect("at least one receiver");
assert!(guard.is_empty(), "{guard:?}");
rx
};
let first = rx.recv().await.expect("non-empty request");
let spec = first.read_object_spec.as_ref();
assert_eq!(
spec.map(|s| s.bucket.as_str()),
Some("projects/_/buckets/test-bucket")
);
assert_eq!(spec.map(|s| s.object.as_str()), Some("test-object"));
assert_eq!(spec.map(|s| s.generation), Some(345678));
assert_eq!(
spec.and_then(|s| s.read_handle.as_ref())
.map(|h| h.handle.clone()),
Some(bytes::Bytes::from_static(b"test-handle")),
);
assert_eq!(first.read_ranges, ranges);
Ok(())
}
#[tokio::test]
async fn start_redirect_then_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockTestClient::new();
let receivers = Arc::new(Mutex::new(Vec::new()));
let save = receivers.clone();
mock.expect_start()
.times(1)
.in_sequence(&mut seq)
.returning(move |_, _, rx, _, _, _| {
save.lock().expect("never poisoned").push(rx);
Ok(Err(redirect_status("r1")))
});
let save = receivers.clone();
mock.expect_start()
.times(1)
.in_sequence(&mut seq)
.returning(move |_, _, rx, _, _, _| {
save.lock().expect("never poisoned").push(rx);
Err(permanent_error())
});
let client = SharedMockClient::new(mock);
let spec = BidiReadObjectSpec {
bucket: "projects/_/buckets/test-bucket".into(),
object: "test-object".into(),
..BidiReadObjectSpec::default()
};
let mut connector = Connector::new(spec, test_options(), client);
let err = connector.connect(Vec::new()).await.unwrap_err();
assert_eq!(err.status(), permanent_error().status(), "{err:?}");
let got = connector.spec.lock().expect("never poisoned").clone();
assert_eq!(got.routing_token.as_deref(), Some("r1"));
assert_eq!(got.read_handle, Some(redirect_handle()));
let mut rx = receivers
.lock()
.expect("never poisoned")
.pop()
.expect("at least two receiver");
let got = rx.recv().await.expect("at least one request sent");
let want = BidiReadObjectSpec {
bucket: "projects/_/buckets/test-bucket".into(),
object: "test-object".into(),
routing_token: Some("r1".to_string()),
read_handle: Some(redirect_handle()),
..BidiReadObjectSpec::default()
};
assert_eq!(got.read_object_spec, Some(want));
let mut rx = receivers
.lock()
.expect("never poisoned")
.pop()
.expect("at least two receiver");
let got = rx.recv().await.expect("at least one request sent");
let want = BidiReadObjectSpec {
bucket: "projects/_/buckets/test-bucket".into(),
object: "test-object".into(),
..BidiReadObjectSpec::default()
};
assert_eq!(got.read_object_spec, Some(want));
Ok(())
}
#[tokio::test]
async fn start_redirect_open_with_redirect_then_error() -> Result<()> {
let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(5);
let stream = TonicResponse::from(rx);
let mut seq = mockall::Sequence::new();
let mut mock = MockTestClient::new();
mock.expect_start()
.times(1)
.in_sequence(&mut seq)
.returning(|_, _, _, _, _, _| Ok(Err(redirect_status("r1"))));
mock.expect_start()
.times(1)
.in_sequence(&mut seq)
.return_once(move |_, _, _, _, _, _| Ok(Ok(stream)));
mock.expect_start()
.times(1)
.in_sequence(&mut seq)
.return_once(move |_, _, _, _, _, _| Err(permanent_error()));
let client = SharedMockClient::new(mock);
let spec = BidiReadObjectSpec {
bucket: "projects/_/buckets/test-bucket".into(),
object: "test-object".into(),
..BidiReadObjectSpec::default()
};
tx.send(Err(redirect_status("r2"))).await?;
drop(tx);
let mut connector = Connector::new(spec, test_options(), client);
let err = connector.connect(Vec::new()).await.unwrap_err();
assert_eq!(err.status(), permanent_error().status(), "{err:?}");
let guard = connector.spec.lock().expect("never poisoned");
assert_eq!(guard.routing_token.as_deref(), Some("r2"));
assert_eq!(guard.read_handle, Some(redirect_handle()));
Ok(())
}
#[tokio::test]
async fn start_immediately_closed() -> Result<()> {
let (tx1, rx1) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(5);
let stream1 = TonicResponse::from(rx1);
drop(tx1);
let (tx2, rx2) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(5);
let stream2 = TonicResponse::from(rx2);
let mut seq = mockall::Sequence::new();
let mut mock = MockTestClient::new();
mock.expect_start()
.times(1)
.in_sequence(&mut seq)
.return_once(move |_, _, _, _, _, _| Ok(Ok(stream1)));
mock.expect_start()
.times(1)
.in_sequence(&mut seq)
.return_once(move |_, _, _, _, _, _| Ok(Ok(stream2)));
let client = SharedMockClient::new(mock);
let spec = BidiReadObjectSpec {
bucket: "projects/_/buckets/test-bucket".into(),
object: "test-object".into(),
..BidiReadObjectSpec::default()
};
let initial = BidiReadObjectResponse {
metadata: Some(Object {
bucket: "projects/_/buckets/test-bucket".into(),
name: "test-object".into(),
generation: 123456,
..Object::default()
}),
read_handle: Some(BidiReadHandle {
handle: bytes::Bytes::from_static(b"test-handle-open"),
}),
..BidiReadObjectResponse::default()
};
tx2.send(Ok(initial.clone())).await?;
let mut connector = Connector::new(spec, test_options(), client);
let (response, _h, _connection) = connector.connect(Vec::new()).await?;
assert_eq!(response, initial);
let guard = connector.spec.lock().expect("never poisoned");
assert!(guard.routing_token.is_none(), "{guard:?}");
assert_eq!(guard.generation, 123456, "{guard:?}");
assert_eq!(
guard.read_handle.as_ref().map(|h| h.handle.clone()),
Some(bytes::Bytes::from_static(b"test-handle-open"))
);
drop(tx2);
Ok(())
}
#[tokio::test]
async fn start_success() -> Result<()> {
let (tx, rx) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(5);
let stream = TonicResponse::from(rx);
let mut mock = MockTestClient::new();
mock.expect_start()
.return_once(move |_, _, _, _, _, _| Ok(Ok(stream)));
let client = SharedMockClient::new(mock);
let spec = BidiReadObjectSpec {
bucket: "projects/_/buckets/test-bucket".into(),
object: "test-object".into(),
..BidiReadObjectSpec::default()
};
let initial = BidiReadObjectResponse {
metadata: Some(Object {
bucket: "projects/_/buckets/test-bucket".into(),
name: "test-object".into(),
generation: 123456,
..Object::default()
}),
read_handle: Some(BidiReadHandle {
handle: bytes::Bytes::from_static(b"test-handle-open"),
}),
..BidiReadObjectResponse::default()
};
tx.send(Ok(initial.clone())).await?;
let mut connector = Connector::new(spec, test_options(), client);
let (response, _h, _connection) = connector.connect(Vec::new()).await?;
assert_eq!(response, initial);
let guard = connector.spec.lock().expect("never poisoned");
assert!(guard.routing_token.is_none(), "{guard:?}");
assert_eq!(guard.generation, 123456, "{guard:?}");
assert_eq!(
guard.read_handle.as_ref().map(|h| h.handle.clone()),
Some(bytes::Bytes::from_static(b"test-handle-open"))
);
drop(tx);
Ok(())
}
#[tokio::test]
async fn start_success_then_reconnect() -> Result<()> {
let (tx1, rx1) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(5);
let stream1 = TonicResponse::from(rx1);
let (tx2, rx2) = tokio::sync::mpsc::channel::<TonicResult<BidiReadObjectResponse>>(5);
let stream2 = TonicResponse::from(rx2);
let mut seq = mockall::Sequence::new();
let mut mock = MockTestClient::new();
mock.expect_start()
.times(1)
.in_sequence(&mut seq)
.return_once(move |_, _, _, _, _, _| Ok(Ok(stream1)));
mock.expect_start()
.times(1)
.in_sequence(&mut seq)
.return_once(move |_, _, _, _, _, _| Ok(Ok(stream2)));
let client = SharedMockClient::new(mock);
let spec = BidiReadObjectSpec {
bucket: "projects/_/buckets/test-bucket".into(),
object: "test-object".into(),
..BidiReadObjectSpec::default()
};
let i1 = BidiReadObjectResponse {
metadata: Some(Object {
bucket: "projects/_/buckets/test-bucket".into(),
name: "test-object".into(),
generation: 123456,
..Object::default()
}),
read_handle: Some(BidiReadHandle {
handle: bytes::Bytes::from_static(b"test-handle-open"),
}),
..BidiReadObjectResponse::default()
};
tx1.send(Ok(i1.clone())).await?;
let mut connector = Connector::new(spec, test_options(), client);
let (response, _h, _connection) = connector.connect(Vec::new()).await?;
assert_eq!(response, i1);
let got = connector.spec.lock().expect("never poisoned").clone();
assert!(got.routing_token.is_none(), "{got:?}");
assert_eq!(got.generation, 123456, "{got:?}");
assert_eq!(
got.read_handle.map(|h| h.handle.clone()),
Some(bytes::Bytes::from_static(b"test-handle-open"))
);
drop(tx1);
let ranges = vec![
ProtoRange {
read_id: 1,
..ProtoRange::default()
},
ProtoRange {
read_id: 2,
..ProtoRange::default()
},
];
let i2 = BidiReadObjectResponse {
metadata: Some(Object {
bucket: "projects/_/buckets/test-bucket".into(),
name: "test-object".into(),
generation: 123456,
..Object::default()
}),
object_data_ranges: ranges
.iter()
.map(|range| ObjectRangeData {
read_range: Some(*range),
..ObjectRangeData::default()
})
.collect(),
..BidiReadObjectResponse::default()
};
tx2.send(Ok(i2.clone())).await?;
let (response, _h, _connection) = connector
.reconnect(redirect_status("r2"), ranges.clone())
.await?;
assert_eq!(response, i2);
let got = connector.spec.lock().expect("never poisoned").clone();
assert_eq!(got.routing_token.as_deref(), Some("r2"), "{got:?}");
assert_eq!(got.generation, 123456, "{got:?}");
assert_eq!(got.read_handle, Some(redirect_handle()), "{got:?}");
drop(tx2);
Ok(())
}
#[tokio::test]
async fn reconnect_permanent() -> Result<()> {
let mut mock = MockTestClient::new();
mock.expect_start()
.return_once(|_, _, _, _, _, _| Err(permanent_error()));
let client = SharedMockClient::new(mock);
let spec = BidiReadObjectSpec {
bucket: "projects/_/buckets/test-bucket".into(),
object: "test-object".into(),
..BidiReadObjectSpec::default()
};
let mut connector = Connector::new(spec, test_options(), client);
let status = Status::permission_denied("uh-oh");
let err = connector.reconnect(status, Vec::new()).await.unwrap_err();
assert!(err.status().is_some(), "{err:?}");
Ok(())
}
#[tokio::test]
async fn reconnect_exhausted() -> Result<()> {
let mut mock = MockTestClient::new();
mock.expect_start().never();
let client = SharedMockClient::new(mock);
let spec = BidiReadObjectSpec {
bucket: "projects/_/buckets/test-bucket".into(),
object: "test-object".into(),
..BidiReadObjectSpec::default()
};
let mut options = test_options();
options.set_read_resume_policy(Arc::new(AlwaysResume.with_attempt_limit(1)));
let mut connector = Connector::new(spec, options, client);
let status = Status::unavailable("try-again");
let err = connector.reconnect(status, Vec::new()).await.unwrap_err();
assert!(err.status().is_some(), "{err:?}");
Ok(())
}
fn test_credentials() -> Credentials {
Anonymous::new().build()
}
}