mod api_error;
mod api_response_stream;
mod connection_client;
mod connection_registry;
mod grpc;
#[cfg(not(target_arch = "wasm32"))]
mod segment_chunk_provider;
#[cfg(not(target_arch = "wasm32"))]
pub use self::segment_chunk_provider::SegmentChunkProvider;
pub use self::api_error::{ApiError, ApiErrorKind, ApiResult};
pub use self::api_response_stream::ApiResponseStream;
pub use self::connection_client::{
ConnectionClient, FetchChunksResponseStream, GenericConnectionClient, SegmentQueryParams,
};
pub use self::connection_registry::{
ClientCredentialsError, ConnectionRegistry, ConnectionRegistryHandle, CredentialSource,
Credentials, SourcedCredentials,
};
pub use self::grpc::{
ChunksWithSegment, RedapClient, RedapClientInner, StreamingOptions, channel,
fetch_chunks_response_to_chunk_and_segment_id, stream_blueprint_and_segment_from_server,
};
pub use opentelemetry::TraceId;
const MAX_DECODING_MESSAGE_SIZE: usize = u32::MAX as usize;
pub const FETCH_CHUNKS_DEADLINE: std::time::Duration = std::time::Duration::from_secs(300);
const GRPC_RESPONSE_TRACEID_HEADER: &str = "x-request-trace-id";
pub fn extract_trace_id(metadata: &tonic::metadata::MetadataMap) -> Option<opentelemetry::TraceId> {
let s = metadata.get(GRPC_RESPONSE_TRACEID_HEADER)?.to_str().ok()?;
opentelemetry::TraceId::from_hex(s).ok()
}
#[derive(Debug)]
pub struct TonicStatusError(Box<tonic::Status>);
const _: () = assert!(
std::mem::size_of::<TonicStatusError>() <= 32,
"Error type is too large. Try to reduce its size by boxing some of its variants.",
);
impl AsRef<tonic::Status> for TonicStatusError {
#[inline]
fn as_ref(&self) -> &tonic::Status {
&self.0
}
}
impl TonicStatusError {
pub fn into_inner(self) -> tonic::Status {
*self.0
}
}
impl std::fmt::Display for TonicStatusError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
fmt_tonic_status(f, &self.0)
}
}
fn fmt_tonic_status(f: &mut std::fmt::Formatter<'_>, status: &tonic::Status) -> std::fmt::Result {
if status.message().is_empty() {
write!(f, "gRPC error")?;
} else {
write!(f, "{}", status.message())?;
}
if status.code() != tonic::Code::Unknown {
write!(f, " ({})", status.code())?;
}
if !status.metadata().is_empty() {
write!(
f,
"{} metadata: {:?}",
re_error::DETAILS_SEPARATOR,
status.metadata().as_ref()
)?;
}
Ok(())
}
impl From<tonic::Status> for TonicStatusError {
fn from(value: tonic::Status) -> Self {
Self(Box::new(value))
}
}
impl std::error::Error for TonicStatusError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.0.source()
}
}
pub async fn with_retry<T, F, Fut>(req_name: &str, f: F) -> ApiResult<T>
where
F: Fn() -> Fut,
Fut: Future<Output = ApiResult<T>>,
{
use tracing::Instrument as _;
let span = tracing::debug_span!(
"with_retry",
otel.name = format!("{req_name} with_retry"),
req_name,
);
with_retry_inner(req_name, f).instrument(span).await
}
async fn with_retry_inner<T, F, Fut>(req_name: &str, f: F) -> ApiResult<T>
where
F: Fn() -> Fut,
Fut: Future<Output = ApiResult<T>>,
{
const MAX_ATTEMPTS: usize = 5;
let mut backoff_gen = re_backoff::BackoffGenerator::new(
std::time::Duration::from_millis(100),
std::time::Duration::from_secs(3),
)
.expect("base is less than max");
let mut attempts = 1;
let mut last_retryable_err = None;
while attempts <= MAX_ATTEMPTS {
use tracing::Instrument as _;
let res = f()
.instrument(tracing::debug_span!("attempt", attempts))
.await;
match res {
Err(err) if err.kind.is_retryable() => {
last_retryable_err = Some(err);
let backoff = backoff_gen.gen_next();
tracing::trace!(
attempts,
max_attempts = MAX_ATTEMPTS,
?backoff,
"{req_name} failed with retryable gRPC error, retrying after backoff"
);
backoff.sleep().await;
}
Err(err) => {
tracing::trace!(
attempts,
"{req_name} failed with non-retryable error: {err}"
);
return Err(err);
}
Ok(value) => {
tracing::trace!(attempts, "{req_name} succeeded");
return Ok(value);
}
}
attempts += 1;
}
tracing::trace!(
attempts,
max_attempts = MAX_ATTEMPTS,
"{req_name} failed after max retries, giving up"
);
Err(last_retryable_err.expect("bug: this should not be None if we reach here"))
}