use std::{
collections::HashSet,
env::VarError,
fmt,
num::NonZeroU32,
ops::{Deref, RangeTo},
pin::Pin,
str::FromStr,
time::Duration,
};
use bytes::Bytes;
use http::{
header::HeaderValue,
uri::{Authority, Scheme},
};
use rand::RngExt;
use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
pub use s2_common::caps::RECORD_BATCH_MAX;
pub use s2_common::types::ValidationError;
pub use s2_common::types::access::AccessTokenId;
pub use s2_common::types::access::AccessTokenIdPrefix;
pub use s2_common::types::access::AccessTokenIdStartAfter;
pub use s2_common::types::basin::BasinName;
pub use s2_common::types::basin::BasinNamePrefix;
pub use s2_common::types::basin::BasinNameStartAfter;
pub use s2_common::types::stream::StreamName;
pub use s2_common::types::stream::StreamNamePrefix;
pub use s2_common::types::stream::StreamNameStartAfter;
pub(crate) const ONE_MIB: u32 = 1024 * 1024;
use s2_common::{maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH};
use secrecy::SecretString;
use crate::api::{ApiError, ApiErrorResponse};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct S2DateTime(time::OffsetDateTime);
impl TryFrom<time::OffsetDateTime> for S2DateTime {
type Error = ValidationError;
fn try_from(dt: time::OffsetDateTime) -> Result<Self, Self::Error> {
dt.format(&time::format_description::well_known::Rfc3339)
.map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))?;
Ok(Self(dt))
}
}
impl From<S2DateTime> for time::OffsetDateTime {
fn from(dt: S2DateTime) -> Self {
dt.0
}
}
impl FromStr for S2DateTime {
type Err = ValidationError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
.map(Self)
.map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))
}
}
impl fmt::Display for S2DateTime {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}",
self.0
.format(&time::format_description::well_known::Rfc3339)
.expect("RFC3339 formatting should not fail for S2DateTime")
)
}
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum BasinAuthority {
ParentZone(Authority),
Direct(Authority),
}
#[derive(Debug, Clone)]
pub struct AccountEndpoint {
scheme: Scheme,
authority: Authority,
}
impl AccountEndpoint {
pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
endpoint.parse()
}
}
impl FromStr for AccountEndpoint {
type Err = ValidationError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let (scheme, authority) = match s.find("://") {
Some(idx) => {
let scheme: Scheme = s[..idx]
.parse()
.map_err(|_| "invalid account endpoint scheme".to_string())?;
(scheme, &s[idx + 3..])
}
None => (Scheme::HTTPS, s),
};
Ok(Self {
scheme,
authority: authority
.parse()
.map_err(|e| format!("invalid account endpoint authority: {e}"))?,
})
}
}
#[derive(Debug, Clone)]
pub struct BasinEndpoint {
scheme: Scheme,
authority: BasinAuthority,
}
impl BasinEndpoint {
pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
endpoint.parse()
}
}
impl FromStr for BasinEndpoint {
type Err = ValidationError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let (scheme, authority) = match s.find("://") {
Some(idx) => {
let scheme: Scheme = s[..idx]
.parse()
.map_err(|_| "invalid basin endpoint scheme".to_string())?;
(scheme, &s[idx + 3..])
}
None => (Scheme::HTTPS, s),
};
let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
BasinAuthority::ParentZone(
authority
.parse()
.map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
)
} else {
BasinAuthority::Direct(
authority
.parse()
.map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
)
};
Ok(Self { scheme, authority })
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct S2Endpoints {
pub(crate) scheme: Scheme,
pub(crate) account_authority: Authority,
pub(crate) basin_authority: BasinAuthority,
}
impl S2Endpoints {
pub fn new(
account_endpoint: AccountEndpoint,
basin_endpoint: BasinEndpoint,
) -> Result<Self, ValidationError> {
if account_endpoint.scheme != basin_endpoint.scheme {
return Err("account and basin endpoints must have the same scheme".into());
}
Ok(Self {
scheme: account_endpoint.scheme,
account_authority: account_endpoint.authority,
basin_authority: basin_endpoint.authority,
})
}
pub fn from_env() -> Result<Self, ValidationError> {
let account_endpoint: AccountEndpoint = match std::env::var("S2_ACCOUNT_ENDPOINT") {
Ok(endpoint) => endpoint.parse()?,
Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
Err(VarError::NotUnicode(_)) => {
return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
}
};
let basin_endpoint: BasinEndpoint = match std::env::var("S2_BASIN_ENDPOINT") {
Ok(endpoint) => endpoint.parse()?,
Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
Err(VarError::NotUnicode(_)) => {
return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
}
};
if account_endpoint.scheme != basin_endpoint.scheme {
return Err(
"S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
);
}
Ok(Self {
scheme: account_endpoint.scheme,
account_authority: account_endpoint.authority,
basin_authority: basin_endpoint.authority,
})
}
pub(crate) fn for_aws() -> Self {
Self {
scheme: Scheme::HTTPS,
account_authority: "aws.s2.dev".try_into().expect("valid authority"),
basin_authority: BasinAuthority::ParentZone(
"b.aws.s2.dev".try_into().expect("valid authority"),
),
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum Compression {
None,
Gzip,
Zstd,
}
impl From<Compression> for CompressionAlgorithm {
fn from(value: Compression) -> Self {
match value {
Compression::None => CompressionAlgorithm::None,
Compression::Gzip => CompressionAlgorithm::Gzip,
Compression::Zstd => CompressionAlgorithm::Zstd,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
#[non_exhaustive]
pub enum AppendRetryPolicy {
All,
NoSideEffects,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct RetryConfig {
pub max_attempts: NonZeroU32,
pub min_base_delay: Duration,
pub max_base_delay: Duration,
pub append_retry_policy: AppendRetryPolicy,
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
min_base_delay: Duration::from_millis(100),
max_base_delay: Duration::from_secs(1),
append_retry_policy: AppendRetryPolicy::All,
}
}
}
impl RetryConfig {
pub fn new() -> Self {
Self::default()
}
pub(crate) fn max_retries(&self) -> u32 {
self.max_attempts.get() - 1
}
pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
Self {
max_attempts,
..self
}
}
pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
Self {
min_base_delay,
..self
}
}
pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
Self {
max_base_delay,
..self
}
}
pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
Self {
append_retry_policy,
..self
}
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct S2Config {
pub(crate) access_token: SecretString,
pub(crate) endpoints: S2Endpoints,
pub(crate) connection_timeout: Duration,
pub(crate) request_timeout: Duration,
pub(crate) retry: RetryConfig,
pub(crate) compression: Compression,
pub(crate) user_agent: HeaderValue,
pub(crate) insecure_skip_cert_verification: bool,
}
impl S2Config {
pub fn new(access_token: impl Into<String>) -> Self {
Self {
access_token: access_token.into().into(),
endpoints: S2Endpoints::for_aws(),
connection_timeout: Duration::from_secs(3),
request_timeout: Duration::from_secs(5),
retry: RetryConfig::new(),
compression: Compression::None,
user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
.parse()
.expect("valid user agent"),
insecure_skip_cert_verification: false,
}
}
pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
Self { endpoints, ..self }
}
pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
Self {
connection_timeout,
..self
}
}
pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
Self {
request_timeout,
..self
}
}
pub fn with_retry(self, retry: RetryConfig) -> Self {
Self { retry, ..self }
}
pub fn with_compression(self, compression: Compression) -> Self {
Self {
compression,
..self
}
}
pub fn with_insecure_skip_cert_verification(self, skip: bool) -> Self {
Self {
insecure_skip_cert_verification: skip,
..self
}
}
#[doc(hidden)]
#[cfg(feature = "_hidden")]
pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
let user_agent = user_agent
.into()
.parse()
.map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
Ok(Self { user_agent, ..self })
}
}
#[derive(Debug, Default, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub struct Page<T> {
pub values: Vec<T>,
pub has_more: bool,
}
impl<T> Page<T> {
pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
Self {
values: values.into(),
has_more,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StorageClass {
Standard,
Express,
}
impl From<api::config::StorageClass> for StorageClass {
fn from(value: api::config::StorageClass) -> Self {
match value {
api::config::StorageClass::Standard => StorageClass::Standard,
api::config::StorageClass::Express => StorageClass::Express,
}
}
}
impl From<StorageClass> for api::config::StorageClass {
fn from(value: StorageClass) -> Self {
match value {
StorageClass::Standard => api::config::StorageClass::Standard,
StorageClass::Express => api::config::StorageClass::Express,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RetentionPolicy {
Age(u64),
Infinite,
}
impl From<api::config::RetentionPolicy> for RetentionPolicy {
fn from(value: api::config::RetentionPolicy) -> Self {
match value {
api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
}
}
}
impl From<RetentionPolicy> for api::config::RetentionPolicy {
fn from(value: RetentionPolicy) -> Self {
match value {
RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
RetentionPolicy::Infinite => {
api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TimestampingMode {
ClientPrefer,
ClientRequire,
Arrival,
}
impl From<api::config::TimestampingMode> for TimestampingMode {
fn from(value: api::config::TimestampingMode) -> Self {
match value {
api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
}
}
}
impl From<TimestampingMode> for api::config::TimestampingMode {
fn from(value: TimestampingMode) -> Self {
match value {
TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
}
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
#[non_exhaustive]
pub struct TimestampingConfig {
pub mode: Option<TimestampingMode>,
pub uncapped: bool,
}
impl TimestampingConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_mode(self, mode: TimestampingMode) -> Self {
Self {
mode: Some(mode),
..self
}
}
pub fn with_uncapped(self, uncapped: bool) -> Self {
Self { uncapped, ..self }
}
}
impl From<api::config::TimestampingConfig> for TimestampingConfig {
fn from(value: api::config::TimestampingConfig) -> Self {
Self {
mode: value.mode.map(Into::into),
uncapped: value.uncapped.unwrap_or_default(),
}
}
}
impl From<TimestampingConfig> for api::config::TimestampingConfig {
fn from(value: TimestampingConfig) -> Self {
Self {
mode: value.mode.map(Into::into),
uncapped: Some(value.uncapped),
}
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
#[non_exhaustive]
pub struct DeleteOnEmptyConfig {
pub min_age_secs: u64,
}
impl DeleteOnEmptyConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_min_age(self, min_age: Duration) -> Self {
Self {
min_age_secs: min_age.as_secs(),
}
}
}
impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
Self {
min_age_secs: value.min_age_secs,
}
}
}
impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
fn from(value: DeleteOnEmptyConfig) -> Self {
Self {
min_age_secs: value.min_age_secs,
}
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
#[non_exhaustive]
pub struct StreamConfig {
pub storage_class: Option<StorageClass>,
pub retention_policy: Option<RetentionPolicy>,
pub timestamping: Option<TimestampingConfig>,
pub delete_on_empty: Option<DeleteOnEmptyConfig>,
}
impl StreamConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
Self {
storage_class: Some(storage_class),
..self
}
}
pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
Self {
retention_policy: Some(retention_policy),
..self
}
}
pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
Self {
timestamping: Some(timestamping),
..self
}
}
pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
Self {
delete_on_empty: Some(delete_on_empty),
..self
}
}
}
impl From<api::config::StreamConfig> for StreamConfig {
fn from(value: api::config::StreamConfig) -> Self {
Self {
storage_class: value.storage_class.map(Into::into),
retention_policy: value.retention_policy.map(Into::into),
timestamping: value.timestamping.map(Into::into),
delete_on_empty: value.delete_on_empty.map(Into::into),
}
}
}
impl From<StreamConfig> for api::config::StreamConfig {
fn from(value: StreamConfig) -> Self {
Self {
storage_class: value.storage_class.map(Into::into),
retention_policy: value.retention_policy.map(Into::into),
timestamping: value.timestamping.map(Into::into),
delete_on_empty: value.delete_on_empty.map(Into::into),
}
}
}
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct BasinConfig {
pub default_stream_config: Option<StreamConfig>,
pub create_stream_on_append: bool,
pub create_stream_on_read: bool,
}
impl BasinConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
Self {
default_stream_config: Some(config),
..self
}
}
pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
Self {
create_stream_on_append,
..self
}
}
pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
Self {
create_stream_on_read,
..self
}
}
}
impl From<api::config::BasinConfig> for BasinConfig {
fn from(value: api::config::BasinConfig) -> Self {
Self {
default_stream_config: value.default_stream_config.map(Into::into),
create_stream_on_append: value.create_stream_on_append,
create_stream_on_read: value.create_stream_on_read,
}
}
}
impl From<BasinConfig> for api::config::BasinConfig {
fn from(value: BasinConfig) -> Self {
Self {
default_stream_config: value.default_stream_config.map(Into::into),
create_stream_on_append: value.create_stream_on_append,
create_stream_on_read: value.create_stream_on_read,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BasinScope {
AwsUsEast1,
}
impl From<api::basin::BasinScope> for BasinScope {
fn from(value: api::basin::BasinScope) -> Self {
match value {
api::basin::BasinScope::AwsUsEast1 => BasinScope::AwsUsEast1,
}
}
}
impl From<BasinScope> for api::basin::BasinScope {
fn from(value: BasinScope) -> Self {
match value {
BasinScope::AwsUsEast1 => api::basin::BasinScope::AwsUsEast1,
}
}
}
#[doc(hidden)]
#[cfg(feature = "_hidden")]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CreateOrReconfigured<T> {
Created(T),
Reconfigured(T),
}
#[cfg(feature = "_hidden")]
impl<T> CreateOrReconfigured<T> {
pub fn is_created(&self) -> bool {
matches!(self, Self::Created(_))
}
pub fn into_inner(self) -> T {
match self {
Self::Created(t) | Self::Reconfigured(t) => t,
}
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct CreateBasinInput {
pub name: BasinName,
pub config: Option<BasinConfig>,
pub scope: Option<BasinScope>,
idempotency_token: String,
}
impl CreateBasinInput {
pub fn new(name: BasinName) -> Self {
Self {
name,
config: None,
scope: None,
idempotency_token: idempotency_token(),
}
}
pub fn with_config(self, config: BasinConfig) -> Self {
Self {
config: Some(config),
..self
}
}
pub fn with_scope(self, scope: BasinScope) -> Self {
Self {
scope: Some(scope),
..self
}
}
}
impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
fn from(value: CreateBasinInput) -> Self {
(
api::basin::CreateBasinRequest {
basin: value.name,
config: value.config.map(Into::into),
scope: value.scope.map(Into::into),
},
value.idempotency_token,
)
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
#[doc(hidden)]
#[cfg(feature = "_hidden")]
pub struct CreateOrReconfigureBasinInput {
pub name: BasinName,
pub config: Option<BasinReconfiguration>,
pub scope: Option<BasinScope>,
}
#[cfg(feature = "_hidden")]
impl CreateOrReconfigureBasinInput {
pub fn new(name: BasinName) -> Self {
Self {
name,
config: None,
scope: None,
}
}
pub fn with_config(self, config: BasinReconfiguration) -> Self {
Self {
config: Some(config),
..self
}
}
pub fn with_scope(self, scope: BasinScope) -> Self {
Self {
scope: Some(scope),
..self
}
}
}
#[cfg(feature = "_hidden")]
impl From<CreateOrReconfigureBasinInput>
for (
BasinName,
Option<api::basin::CreateOrReconfigureBasinRequest>,
)
{
fn from(value: CreateOrReconfigureBasinInput) -> Self {
let request = if value.config.is_some() || value.scope.is_some() {
Some(api::basin::CreateOrReconfigureBasinRequest {
config: value.config.map(Into::into),
scope: value.scope.map(Into::into),
})
} else {
None
};
(value.name, request)
}
}
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct ListBasinsInput {
pub prefix: BasinNamePrefix,
pub start_after: BasinNameStartAfter,
pub limit: Option<usize>,
}
impl ListBasinsInput {
pub fn new() -> Self {
Self::default()
}
pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
Self { prefix, ..self }
}
pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
Self {
start_after,
..self
}
}
pub fn with_limit(self, limit: usize) -> Self {
Self {
limit: Some(limit),
..self
}
}
}
impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
fn from(value: ListBasinsInput) -> Self {
Self {
prefix: Some(value.prefix),
start_after: Some(value.start_after),
limit: value.limit,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct ListAllBasinsInput {
pub prefix: BasinNamePrefix,
pub start_after: BasinNameStartAfter,
pub include_deleted: bool,
}
impl ListAllBasinsInput {
pub fn new() -> Self {
Self::default()
}
pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
Self { prefix, ..self }
}
pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
Self {
start_after,
..self
}
}
pub fn with_include_deleted(self, include_deleted: bool) -> Self {
Self {
include_deleted,
..self
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub struct BasinInfo {
pub name: BasinName,
pub scope: Option<BasinScope>,
pub created_at: S2DateTime,
pub deleted_at: Option<S2DateTime>,
}
impl TryFrom<api::basin::BasinInfo> for BasinInfo {
type Error = ValidationError;
fn try_from(value: api::basin::BasinInfo) -> Result<Self, Self::Error> {
Ok(Self {
name: value.name,
scope: value.scope.map(Into::into),
created_at: value.created_at.try_into()?,
deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
})
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct DeleteBasinInput {
pub name: BasinName,
pub ignore_not_found: bool,
}
impl DeleteBasinInput {
pub fn new(name: BasinName) -> Self {
Self {
name,
ignore_not_found: false,
}
}
pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
Self {
ignore_not_found,
..self
}
}
}
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct TimestampingReconfiguration {
pub mode: Maybe<Option<TimestampingMode>>,
pub uncapped: Maybe<Option<bool>>,
}
impl TimestampingReconfiguration {
pub fn new() -> Self {
Self::default()
}
pub fn with_mode(self, mode: TimestampingMode) -> Self {
Self {
mode: Maybe::Specified(Some(mode)),
..self
}
}
pub fn with_uncapped(self, uncapped: bool) -> Self {
Self {
uncapped: Maybe::Specified(Some(uncapped)),
..self
}
}
}
impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
fn from(value: TimestampingReconfiguration) -> Self {
Self {
mode: value.mode.map(|m| m.map(Into::into)),
uncapped: value.uncapped,
}
}
}
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct DeleteOnEmptyReconfiguration {
pub min_age_secs: Maybe<Option<u64>>,
}
impl DeleteOnEmptyReconfiguration {
pub fn new() -> Self {
Self::default()
}
pub fn with_min_age(self, min_age: Duration) -> Self {
Self {
min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
}
}
}
impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
fn from(value: DeleteOnEmptyReconfiguration) -> Self {
Self {
min_age_secs: value.min_age_secs,
}
}
}
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct StreamReconfiguration {
pub storage_class: Maybe<Option<StorageClass>>,
pub retention_policy: Maybe<Option<RetentionPolicy>>,
pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
}
impl StreamReconfiguration {
pub fn new() -> Self {
Self::default()
}
pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
Self {
storage_class: Maybe::Specified(Some(storage_class)),
..self
}
}
pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
Self {
retention_policy: Maybe::Specified(Some(retention_policy)),
..self
}
}
pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
Self {
timestamping: Maybe::Specified(Some(timestamping)),
..self
}
}
pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
Self {
delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
..self
}
}
}
impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
fn from(value: StreamReconfiguration) -> Self {
Self {
storage_class: value.storage_class.map(|m| m.map(Into::into)),
retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
timestamping: value.timestamping.map(|m| m.map(Into::into)),
delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
}
}
}
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct BasinReconfiguration {
pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
pub create_stream_on_append: Maybe<bool>,
pub create_stream_on_read: Maybe<bool>,
}
impl BasinReconfiguration {
pub fn new() -> Self {
Self::default()
}
pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
Self {
default_stream_config: Maybe::Specified(Some(config)),
..self
}
}
pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
Self {
create_stream_on_append: Maybe::Specified(create_stream_on_append),
..self
}
}
pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
Self {
create_stream_on_read: Maybe::Specified(create_stream_on_read),
..self
}
}
}
impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
fn from(value: BasinReconfiguration) -> Self {
Self {
default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
create_stream_on_append: value.create_stream_on_append,
create_stream_on_read: value.create_stream_on_read,
}
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ReconfigureBasinInput {
pub name: BasinName,
pub config: BasinReconfiguration,
}
impl ReconfigureBasinInput {
pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
Self { name, config }
}
}
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct ListAccessTokensInput {
pub prefix: AccessTokenIdPrefix,
pub start_after: AccessTokenIdStartAfter,
pub limit: Option<usize>,
}
impl ListAccessTokensInput {
pub fn new() -> Self {
Self::default()
}
pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
Self { prefix, ..self }
}
pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
Self {
start_after,
..self
}
}
pub fn with_limit(self, limit: usize) -> Self {
Self {
limit: Some(limit),
..self
}
}
}
impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
fn from(value: ListAccessTokensInput) -> Self {
Self {
prefix: Some(value.prefix),
start_after: Some(value.start_after),
limit: value.limit,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct ListAllAccessTokensInput {
pub prefix: AccessTokenIdPrefix,
pub start_after: AccessTokenIdStartAfter,
}
impl ListAllAccessTokensInput {
pub fn new() -> Self {
Self::default()
}
pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
Self { prefix, ..self }
}
pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
Self {
start_after,
..self
}
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct AccessTokenInfo {
pub id: AccessTokenId,
pub expires_at: S2DateTime,
pub auto_prefix_streams: bool,
pub scope: AccessTokenScope,
}
impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
type Error = ValidationError;
fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
let expires_at = value
.expires_at
.map(S2DateTime::try_from)
.transpose()?
.ok_or_else(|| ValidationError::from("missing expires_at"))?;
Ok(Self {
id: value.id,
expires_at,
auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
scope: value.scope.into(),
})
}
}
#[derive(Debug, Clone)]
pub enum BasinMatcher {
None,
Exact(BasinName),
Prefix(BasinNamePrefix),
}
#[derive(Debug, Clone)]
pub enum StreamMatcher {
None,
Exact(StreamName),
Prefix(StreamNamePrefix),
}
#[derive(Debug, Clone)]
pub enum AccessTokenMatcher {
None,
Exact(AccessTokenId),
Prefix(AccessTokenIdPrefix),
}
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct ReadWritePermissions {
pub read: bool,
pub write: bool,
}
impl ReadWritePermissions {
pub fn new() -> Self {
Self::default()
}
pub fn read_only() -> Self {
Self {
read: true,
write: false,
}
}
pub fn write_only() -> Self {
Self {
read: false,
write: true,
}
}
pub fn read_write() -> Self {
Self {
read: true,
write: true,
}
}
}
impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
fn from(value: ReadWritePermissions) -> Self {
Self {
read: Some(value.read),
write: Some(value.write),
}
}
}
impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
fn from(value: api::access::ReadWritePermissions) -> Self {
Self {
read: value.read.unwrap_or_default(),
write: value.write.unwrap_or_default(),
}
}
}
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct OperationGroupPermissions {
pub account: Option<ReadWritePermissions>,
pub basin: Option<ReadWritePermissions>,
pub stream: Option<ReadWritePermissions>,
}
impl OperationGroupPermissions {
pub fn new() -> Self {
Self::default()
}
pub fn read_only_all() -> Self {
Self {
account: Some(ReadWritePermissions::read_only()),
basin: Some(ReadWritePermissions::read_only()),
stream: Some(ReadWritePermissions::read_only()),
}
}
pub fn write_only_all() -> Self {
Self {
account: Some(ReadWritePermissions::write_only()),
basin: Some(ReadWritePermissions::write_only()),
stream: Some(ReadWritePermissions::write_only()),
}
}
pub fn read_write_all() -> Self {
Self {
account: Some(ReadWritePermissions::read_write()),
basin: Some(ReadWritePermissions::read_write()),
stream: Some(ReadWritePermissions::read_write()),
}
}
pub fn with_account(self, account: ReadWritePermissions) -> Self {
Self {
account: Some(account),
..self
}
}
pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
Self {
basin: Some(basin),
..self
}
}
pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
Self {
stream: Some(stream),
..self
}
}
}
impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
fn from(value: OperationGroupPermissions) -> Self {
Self {
account: value.account.map(Into::into),
basin: value.basin.map(Into::into),
stream: value.stream.map(Into::into),
}
}
}
impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
fn from(value: api::access::PermittedOperationGroups) -> Self {
Self {
account: value.account.map(Into::into),
basin: value.basin.map(Into::into),
stream: value.stream.map(Into::into),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Operation {
ListBasins,
CreateBasin,
GetBasinConfig,
DeleteBasin,
ReconfigureBasin,
ListAccessTokens,
IssueAccessToken,
RevokeAccessToken,
GetAccountMetrics,
GetBasinMetrics,
GetStreamMetrics,
ListStreams,
CreateStream,
GetStreamConfig,
DeleteStream,
ReconfigureStream,
CheckTail,
Append,
Read,
Trim,
Fence,
}
impl From<Operation> for api::access::Operation {
fn from(value: Operation) -> Self {
match value {
Operation::ListBasins => api::access::Operation::ListBasins,
Operation::CreateBasin => api::access::Operation::CreateBasin,
Operation::DeleteBasin => api::access::Operation::DeleteBasin,
Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
Operation::ListStreams => api::access::Operation::ListStreams,
Operation::CreateStream => api::access::Operation::CreateStream,
Operation::DeleteStream => api::access::Operation::DeleteStream,
Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
Operation::CheckTail => api::access::Operation::CheckTail,
Operation::Append => api::access::Operation::Append,
Operation::Read => api::access::Operation::Read,
Operation::Trim => api::access::Operation::Trim,
Operation::Fence => api::access::Operation::Fence,
Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
}
}
}
impl From<api::access::Operation> for Operation {
fn from(value: api::access::Operation) -> Self {
match value {
api::access::Operation::ListBasins => Operation::ListBasins,
api::access::Operation::CreateBasin => Operation::CreateBasin,
api::access::Operation::DeleteBasin => Operation::DeleteBasin,
api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
api::access::Operation::ListStreams => Operation::ListStreams,
api::access::Operation::CreateStream => Operation::CreateStream,
api::access::Operation::DeleteStream => Operation::DeleteStream,
api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
api::access::Operation::CheckTail => Operation::CheckTail,
api::access::Operation::Append => Operation::Append,
api::access::Operation::Read => Operation::Read,
api::access::Operation::Trim => Operation::Trim,
api::access::Operation::Fence => Operation::Fence,
api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
}
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct AccessTokenScopeInput {
basins: Option<BasinMatcher>,
streams: Option<StreamMatcher>,
access_tokens: Option<AccessTokenMatcher>,
op_group_perms: Option<OperationGroupPermissions>,
ops: HashSet<Operation>,
}
impl AccessTokenScopeInput {
pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
Self {
basins: None,
streams: None,
access_tokens: None,
op_group_perms: None,
ops: ops.into_iter().collect(),
}
}
pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
Self {
basins: None,
streams: None,
access_tokens: None,
op_group_perms: Some(op_group_perms),
ops: HashSet::default(),
}
}
pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
Self {
ops: ops.into_iter().collect(),
..self
}
}
pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
Self {
op_group_perms: Some(op_group_perms),
..self
}
}
pub fn with_basins(self, basins: BasinMatcher) -> Self {
Self {
basins: Some(basins),
..self
}
}
pub fn with_streams(self, streams: StreamMatcher) -> Self {
Self {
streams: Some(streams),
..self
}
}
pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
Self {
access_tokens: Some(access_tokens),
..self
}
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct AccessTokenScope {
pub basins: Option<BasinMatcher>,
pub streams: Option<StreamMatcher>,
pub access_tokens: Option<AccessTokenMatcher>,
pub op_group_perms: Option<OperationGroupPermissions>,
pub ops: HashSet<Operation>,
}
impl From<api::access::AccessTokenScope> for AccessTokenScope {
fn from(value: api::access::AccessTokenScope) -> Self {
Self {
basins: value.basins.map(|rs| match rs {
api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
BasinMatcher::Exact(e)
}
api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
BasinMatcher::None
}
api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
}),
streams: value.streams.map(|rs| match rs {
api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
StreamMatcher::Exact(e)
}
api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
StreamMatcher::None
}
api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
}),
access_tokens: value.access_tokens.map(|rs| match rs {
api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
AccessTokenMatcher::Exact(e)
}
api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
AccessTokenMatcher::None
}
api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
}),
op_group_perms: value.op_groups.map(Into::into),
ops: value
.ops
.map(|ops| ops.into_iter().map(Into::into).collect())
.unwrap_or_default(),
}
}
}
impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
fn from(value: AccessTokenScopeInput) -> Self {
Self {
basins: value.basins.map(|rs| match rs {
BasinMatcher::None => {
api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
}
BasinMatcher::Exact(e) => {
api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
}
BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
}),
streams: value.streams.map(|rs| match rs {
StreamMatcher::None => {
api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
}
StreamMatcher::Exact(e) => {
api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
}
StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
}),
access_tokens: value.access_tokens.map(|rs| match rs {
AccessTokenMatcher::None => {
api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
}
AccessTokenMatcher::Exact(e) => {
api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
}
AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
}),
op_groups: value.op_group_perms.map(Into::into),
ops: if value.ops.is_empty() {
None
} else {
Some(value.ops.into_iter().map(Into::into).collect())
},
}
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct IssueAccessTokenInput {
pub id: AccessTokenId,
pub expires_at: Option<S2DateTime>,
pub auto_prefix_streams: bool,
pub scope: AccessTokenScopeInput,
}
impl IssueAccessTokenInput {
pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
Self {
id,
expires_at: None,
auto_prefix_streams: false,
scope,
}
}
pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
Self {
expires_at: Some(expires_at),
..self
}
}
pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
Self {
auto_prefix_streams,
..self
}
}
}
impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
fn from(value: IssueAccessTokenInput) -> Self {
Self {
id: value.id,
expires_at: value.expires_at.map(Into::into),
auto_prefix_streams: value.auto_prefix_streams.then_some(true),
scope: value.scope.into(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TimeseriesInterval {
Minute,
Hour,
Day,
}
impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
fn from(value: TimeseriesInterval) -> Self {
match value {
TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
}
}
}
impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
fn from(value: api::metrics::TimeseriesInterval) -> Self {
match value {
api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
}
}
}
#[derive(Debug, Clone, Copy)]
#[non_exhaustive]
pub struct TimeRange {
pub start: u32,
pub end: u32,
}
impl TimeRange {
pub fn new(start: u32, end: u32) -> Self {
Self { start, end }
}
}
#[derive(Debug, Clone, Copy)]
#[non_exhaustive]
pub struct TimeRangeAndInterval {
pub start: u32,
pub end: u32,
pub interval: Option<TimeseriesInterval>,
}
impl TimeRangeAndInterval {
pub fn new(start: u32, end: u32) -> Self {
Self {
start,
end,
interval: None,
}
}
pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
Self {
interval: Some(interval),
..self
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum AccountMetricSet {
ActiveBasins(TimeRange),
AccountOps(TimeRangeAndInterval),
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct GetAccountMetricsInput {
pub set: AccountMetricSet,
}
impl GetAccountMetricsInput {
pub fn new(set: AccountMetricSet) -> Self {
Self { set }
}
}
impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
fn from(value: GetAccountMetricsInput) -> Self {
let (set, start, end, interval) = match value.set {
AccountMetricSet::ActiveBasins(args) => (
api::metrics::AccountMetricSet::ActiveBasins,
args.start,
args.end,
None,
),
AccountMetricSet::AccountOps(args) => (
api::metrics::AccountMetricSet::AccountOps,
args.start,
args.end,
args.interval,
),
};
Self {
set,
start: Some(start),
end: Some(end),
interval: interval.map(Into::into),
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum BasinMetricSet {
Storage(TimeRange),
AppendOps(TimeRangeAndInterval),
ReadOps(TimeRangeAndInterval),
ReadThroughput(TimeRangeAndInterval),
AppendThroughput(TimeRangeAndInterval),
BasinOps(TimeRangeAndInterval),
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct GetBasinMetricsInput {
pub name: BasinName,
pub set: BasinMetricSet,
}
impl GetBasinMetricsInput {
pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
Self { name, set }
}
}
impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
fn from(value: GetBasinMetricsInput) -> Self {
let (set, start, end, interval) = match value.set {
BasinMetricSet::Storage(args) => (
api::metrics::BasinMetricSet::Storage,
args.start,
args.end,
None,
),
BasinMetricSet::AppendOps(args) => (
api::metrics::BasinMetricSet::AppendOps,
args.start,
args.end,
args.interval,
),
BasinMetricSet::ReadOps(args) => (
api::metrics::BasinMetricSet::ReadOps,
args.start,
args.end,
args.interval,
),
BasinMetricSet::ReadThroughput(args) => (
api::metrics::BasinMetricSet::ReadThroughput,
args.start,
args.end,
args.interval,
),
BasinMetricSet::AppendThroughput(args) => (
api::metrics::BasinMetricSet::AppendThroughput,
args.start,
args.end,
args.interval,
),
BasinMetricSet::BasinOps(args) => (
api::metrics::BasinMetricSet::BasinOps,
args.start,
args.end,
args.interval,
),
};
(
value.name,
api::metrics::BasinMetricSetRequest {
set,
start: Some(start),
end: Some(end),
interval: interval.map(Into::into),
},
)
}
}
#[derive(Debug, Clone, Copy)]
pub enum StreamMetricSet {
Storage(TimeRange),
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct GetStreamMetricsInput {
pub basin_name: BasinName,
pub stream_name: StreamName,
pub set: StreamMetricSet,
}
impl GetStreamMetricsInput {
pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
Self {
basin_name,
stream_name,
set,
}
}
}
impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
fn from(value: GetStreamMetricsInput) -> Self {
let (set, start, end, interval) = match value.set {
StreamMetricSet::Storage(args) => (
api::metrics::StreamMetricSet::Storage,
args.start,
args.end,
None,
),
};
(
value.basin_name,
value.stream_name,
api::metrics::StreamMetricSetRequest {
set,
start: Some(start),
end: Some(end),
interval,
},
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MetricUnit {
Bytes,
Operations,
}
impl From<api::metrics::MetricUnit> for MetricUnit {
fn from(value: api::metrics::MetricUnit) -> Self {
match value {
api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
api::metrics::MetricUnit::Operations => MetricUnit::Operations,
}
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ScalarMetric {
pub name: String,
pub unit: MetricUnit,
pub value: f64,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct AccumulationMetric {
pub name: String,
pub unit: MetricUnit,
pub interval: TimeseriesInterval,
pub values: Vec<(u32, f64)>,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct GaugeMetric {
pub name: String,
pub unit: MetricUnit,
pub values: Vec<(u32, f64)>,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct LabelMetric {
pub name: String,
pub values: Vec<String>,
}
#[derive(Debug, Clone)]
pub enum Metric {
Scalar(ScalarMetric),
Accumulation(AccumulationMetric),
Gauge(GaugeMetric),
Label(LabelMetric),
}
impl From<api::metrics::Metric> for Metric {
fn from(value: api::metrics::Metric) -> Self {
match value {
api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
name: sm.name.into(),
unit: sm.unit.into(),
value: sm.value,
}),
api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
name: am.name.into(),
unit: am.unit.into(),
interval: am.interval.into(),
values: am.values,
}),
api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
name: gm.name.into(),
unit: gm.unit.into(),
values: gm.values,
}),
api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
name: lm.name.into(),
values: lm.values,
}),
}
}
}
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct ListStreamsInput {
pub prefix: StreamNamePrefix,
pub start_after: StreamNameStartAfter,
pub limit: Option<usize>,
}
impl ListStreamsInput {
pub fn new() -> Self {
Self::default()
}
pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
Self { prefix, ..self }
}
pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
Self {
start_after,
..self
}
}
pub fn with_limit(self, limit: usize) -> Self {
Self {
limit: Some(limit),
..self
}
}
}
impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
fn from(value: ListStreamsInput) -> Self {
Self {
prefix: Some(value.prefix),
start_after: Some(value.start_after),
limit: value.limit,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct ListAllStreamsInput {
pub prefix: StreamNamePrefix,
pub start_after: StreamNameStartAfter,
pub include_deleted: bool,
}
impl ListAllStreamsInput {
pub fn new() -> Self {
Self::default()
}
pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
Self { prefix, ..self }
}
pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
Self {
start_after,
..self
}
}
pub fn with_include_deleted(self, include_deleted: bool) -> Self {
Self {
include_deleted,
..self
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub struct StreamInfo {
pub name: StreamName,
pub created_at: S2DateTime,
pub deleted_at: Option<S2DateTime>,
}
impl TryFrom<api::stream::StreamInfo> for StreamInfo {
type Error = ValidationError;
fn try_from(value: api::stream::StreamInfo) -> Result<Self, Self::Error> {
Ok(Self {
name: value.name,
created_at: value.created_at.try_into()?,
deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
})
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct CreateStreamInput {
pub name: StreamName,
pub config: Option<StreamConfig>,
idempotency_token: String,
}
impl CreateStreamInput {
pub fn new(name: StreamName) -> Self {
Self {
name,
config: None,
idempotency_token: idempotency_token(),
}
}
pub fn with_config(self, config: StreamConfig) -> Self {
Self {
config: Some(config),
..self
}
}
}
impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
fn from(value: CreateStreamInput) -> Self {
(
api::stream::CreateStreamRequest {
stream: value.name,
config: value.config.map(Into::into),
},
value.idempotency_token,
)
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
#[doc(hidden)]
#[cfg(feature = "_hidden")]
pub struct CreateOrReconfigureStreamInput {
pub name: StreamName,
pub config: Option<StreamReconfiguration>,
}
#[cfg(feature = "_hidden")]
impl CreateOrReconfigureStreamInput {
pub fn new(name: StreamName) -> Self {
Self { name, config: None }
}
pub fn with_config(self, config: StreamReconfiguration) -> Self {
Self {
config: Some(config),
..self
}
}
}
#[cfg(feature = "_hidden")]
impl From<CreateOrReconfigureStreamInput>
for (StreamName, Option<api::config::StreamReconfiguration>)
{
fn from(value: CreateOrReconfigureStreamInput) -> Self {
(value.name, value.config.map(Into::into))
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct DeleteStreamInput {
pub name: StreamName,
pub ignore_not_found: bool,
}
impl DeleteStreamInput {
pub fn new(name: StreamName) -> Self {
Self {
name,
ignore_not_found: false,
}
}
pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
Self {
ignore_not_found,
..self
}
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ReconfigureStreamInput {
pub name: StreamName,
pub config: StreamReconfiguration,
}
impl ReconfigureStreamInput {
pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
Self { name, config }
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FencingToken(String);
impl FencingToken {
pub fn generate(n: usize) -> Result<Self, ValidationError> {
rand::rng()
.sample_iter(&rand::distr::Alphanumeric)
.take(n)
.map(char::from)
.collect::<String>()
.parse()
}
}
impl FromStr for FencingToken {
type Err = ValidationError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.len() > MAX_FENCING_TOKEN_LENGTH {
return Err(ValidationError(format!(
"fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
)));
}
Ok(FencingToken(s.to_string()))
}
}
impl std::fmt::Display for FencingToken {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl Deref for FencingToken {
type Target = str;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
#[non_exhaustive]
pub struct StreamPosition {
pub seq_num: u64,
pub timestamp: u64,
}
impl std::fmt::Display for StreamPosition {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
}
}
impl From<api::stream::proto::StreamPosition> for StreamPosition {
fn from(value: api::stream::proto::StreamPosition) -> Self {
Self {
seq_num: value.seq_num,
timestamp: value.timestamp,
}
}
}
impl From<api::stream::StreamPosition> for StreamPosition {
fn from(value: api::stream::StreamPosition) -> Self {
Self {
seq_num: value.seq_num,
timestamp: value.timestamp,
}
}
}
#[derive(Debug, Clone, PartialEq)]
#[non_exhaustive]
pub struct Header {
pub name: Bytes,
pub value: Bytes,
}
impl Header {
pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
Self {
name: name.into(),
value: value.into(),
}
}
}
impl From<Header> for api::stream::proto::Header {
fn from(value: Header) -> Self {
Self {
name: value.name,
value: value.value,
}
}
}
impl From<api::stream::proto::Header> for Header {
fn from(value: api::stream::proto::Header) -> Self {
Self {
name: value.name,
value: value.value,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct AppendRecord {
body: Bytes,
headers: Vec<Header>,
timestamp: Option<u64>,
}
impl AppendRecord {
fn validate(self) -> Result<Self, ValidationError> {
if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
Err(ValidationError(format!(
"metered_bytes: {} exceeds {}",
self.metered_bytes(),
RECORD_BATCH_MAX.bytes
)))
} else {
Ok(self)
}
}
pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
let record = Self {
body: body.into(),
headers: Vec::default(),
timestamp: None,
};
record.validate()
}
pub fn with_headers(
self,
headers: impl IntoIterator<Item = Header>,
) -> Result<Self, ValidationError> {
let record = Self {
headers: headers.into_iter().collect(),
..self
};
record.validate()
}
pub fn with_timestamp(self, timestamp: u64) -> Self {
Self {
timestamp: Some(timestamp),
..self
}
}
pub fn body(&self) -> &[u8] {
&self.body
}
pub fn headers(&self) -> &[Header] {
&self.headers
}
pub fn timestamp(&self) -> Option<u64> {
self.timestamp
}
}
impl From<AppendRecord> for api::stream::proto::AppendRecord {
fn from(value: AppendRecord) -> Self {
Self {
timestamp: value.timestamp,
headers: value.headers.into_iter().map(Into::into).collect(),
body: value.body,
}
}
}
pub trait MeteredBytes {
fn metered_bytes(&self) -> usize;
}
macro_rules! metered_bytes_impl {
($ty:ty) => {
impl MeteredBytes for $ty {
fn metered_bytes(&self) -> usize {
8 + (2 * self.headers.len())
+ self
.headers
.iter()
.map(|h| h.name.len() + h.value.len())
.sum::<usize>()
+ self.body.len()
}
}
};
}
metered_bytes_impl!(AppendRecord);
#[derive(Debug, Clone)]
pub struct AppendRecordBatch {
records: Vec<AppendRecord>,
metered_bytes: usize,
}
impl AppendRecordBatch {
pub(crate) fn with_capacity(capacity: usize) -> Self {
Self {
records: Vec::with_capacity(capacity),
metered_bytes: 0,
}
}
pub(crate) fn push(&mut self, record: AppendRecord) {
self.metered_bytes += record.metered_bytes();
self.records.push(record);
}
pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
where
I: IntoIterator<Item = AppendRecord>,
{
let mut records = Vec::new();
let mut metered_bytes = 0;
for record in iter {
metered_bytes += record.metered_bytes();
records.push(record);
if metered_bytes > RECORD_BATCH_MAX.bytes {
return Err(ValidationError(format!(
"batch size in metered bytes ({metered_bytes}) exceeds {}",
RECORD_BATCH_MAX.bytes
)));
}
if records.len() > RECORD_BATCH_MAX.count {
return Err(ValidationError(format!(
"number of records in the batch exceeds {}",
RECORD_BATCH_MAX.count
)));
}
}
if records.is_empty() {
return Err(ValidationError("batch is empty".into()));
}
Ok(Self {
records,
metered_bytes,
})
}
}
impl Deref for AppendRecordBatch {
type Target = [AppendRecord];
fn deref(&self) -> &Self::Target {
&self.records
}
}
impl MeteredBytes for AppendRecordBatch {
fn metered_bytes(&self) -> usize {
self.metered_bytes
}
}
#[derive(Debug, Clone)]
pub enum Command {
Fence {
fencing_token: FencingToken,
},
Trim {
trim_point: u64,
},
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct CommandRecord {
pub command: Command,
pub timestamp: Option<u64>,
}
impl CommandRecord {
const FENCE: &[u8] = b"fence";
const TRIM: &[u8] = b"trim";
pub fn fence(fencing_token: FencingToken) -> Self {
Self {
command: Command::Fence { fencing_token },
timestamp: None,
}
}
pub fn trim(trim_point: u64) -> Self {
Self {
command: Command::Trim { trim_point },
timestamp: None,
}
}
pub fn with_timestamp(self, timestamp: u64) -> Self {
Self {
timestamp: Some(timestamp),
..self
}
}
}
impl From<CommandRecord> for AppendRecord {
fn from(value: CommandRecord) -> Self {
let (header_value, body) = match value.command {
Command::Fence { fencing_token } => (
CommandRecord::FENCE,
Bytes::copy_from_slice(fencing_token.as_bytes()),
),
Command::Trim { trim_point } => (
CommandRecord::TRIM,
Bytes::copy_from_slice(&trim_point.to_be_bytes()),
),
};
Self {
body,
headers: vec![Header::new("", header_value)],
timestamp: value.timestamp,
}
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct AppendInput {
pub records: AppendRecordBatch,
pub match_seq_num: Option<u64>,
pub fencing_token: Option<FencingToken>,
}
impl AppendInput {
pub fn new(records: AppendRecordBatch) -> Self {
Self {
records,
match_seq_num: None,
fencing_token: None,
}
}
pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
Self {
match_seq_num: Some(match_seq_num),
..self
}
}
pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
Self {
fencing_token: Some(fencing_token),
..self
}
}
}
impl From<AppendInput> for api::stream::proto::AppendInput {
fn from(value: AppendInput) -> Self {
Self {
records: value.records.iter().cloned().map(Into::into).collect(),
match_seq_num: value.match_seq_num,
fencing_token: value.fencing_token.map(|t| t.to_string()),
}
}
}
#[derive(Debug, Clone, PartialEq)]
#[non_exhaustive]
pub struct AppendAck {
pub start: StreamPosition,
pub end: StreamPosition,
pub tail: StreamPosition,
}
impl From<api::stream::proto::AppendAck> for AppendAck {
fn from(value: api::stream::proto::AppendAck) -> Self {
Self {
start: value.start.unwrap_or_default().into(),
end: value.end.unwrap_or_default().into(),
tail: value.tail.unwrap_or_default().into(),
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum ReadFrom {
SeqNum(u64),
Timestamp(u64),
TailOffset(u64),
}
impl Default for ReadFrom {
fn default() -> Self {
Self::SeqNum(0)
}
}
#[derive(Debug, Default, Clone)]
#[non_exhaustive]
pub struct ReadStart {
pub from: ReadFrom,
pub clamp_to_tail: bool,
}
impl ReadStart {
pub fn new() -> Self {
Self::default()
}
pub fn with_from(self, from: ReadFrom) -> Self {
Self { from, ..self }
}
pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
Self {
clamp_to_tail,
..self
}
}
}
impl From<ReadStart> for api::stream::ReadStart {
fn from(value: ReadStart) -> Self {
let (seq_num, timestamp, tail_offset) = match value.from {
ReadFrom::SeqNum(n) => (Some(n), None, None),
ReadFrom::Timestamp(t) => (None, Some(t), None),
ReadFrom::TailOffset(o) => (None, None, Some(o)),
};
Self {
seq_num,
timestamp,
tail_offset,
clamp: if value.clamp_to_tail {
Some(true)
} else {
None
},
}
}
}
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct ReadLimits {
pub count: Option<usize>,
pub bytes: Option<usize>,
}
impl ReadLimits {
pub fn new() -> Self {
Self::default()
}
pub fn with_count(self, count: usize) -> Self {
Self {
count: Some(count),
..self
}
}
pub fn with_bytes(self, bytes: usize) -> Self {
Self {
bytes: Some(bytes),
..self
}
}
}
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct ReadStop {
pub limits: ReadLimits,
pub until: Option<RangeTo<u64>>,
pub wait: Option<u32>,
}
impl ReadStop {
pub fn new() -> Self {
Self::default()
}
pub fn with_limits(self, limits: ReadLimits) -> Self {
Self { limits, ..self }
}
pub fn with_until(self, until: RangeTo<u64>) -> Self {
Self {
until: Some(until),
..self
}
}
pub fn with_wait(self, wait: u32) -> Self {
Self {
wait: Some(wait),
..self
}
}
}
impl From<ReadStop> for api::stream::ReadEnd {
fn from(value: ReadStop) -> Self {
Self {
count: value.limits.count,
bytes: value.limits.bytes,
until: value.until.map(|r| r.end),
wait: value.wait,
}
}
}
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct ReadInput {
pub start: ReadStart,
pub stop: ReadStop,
pub ignore_command_records: bool,
}
impl ReadInput {
pub fn new() -> Self {
Self::default()
}
pub fn with_start(self, start: ReadStart) -> Self {
Self { start, ..self }
}
pub fn with_stop(self, stop: ReadStop) -> Self {
Self { stop, ..self }
}
pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
Self {
ignore_command_records,
..self
}
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct SequencedRecord {
pub seq_num: u64,
pub body: Bytes,
pub headers: Vec<Header>,
pub timestamp: u64,
}
impl SequencedRecord {
pub fn is_command_record(&self) -> bool {
self.headers.len() == 1 && *self.headers[0].name == *b""
}
}
impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
fn from(value: api::stream::proto::SequencedRecord) -> Self {
Self {
seq_num: value.seq_num,
body: value.body,
headers: value.headers.into_iter().map(Into::into).collect(),
timestamp: value.timestamp,
}
}
}
metered_bytes_impl!(SequencedRecord);
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ReadBatch {
pub records: Vec<SequencedRecord>,
pub tail: Option<StreamPosition>,
}
impl ReadBatch {
pub(crate) fn from_api(
batch: api::stream::proto::ReadBatch,
ignore_command_records: bool,
) -> Self {
Self {
records: batch
.records
.into_iter()
.map(Into::into)
.filter(|sr: &SequencedRecord| !ignore_command_records || !sr.is_command_record())
.collect(),
tail: batch.tail.map(Into::into),
}
}
}
pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
#[derive(Debug, Clone, thiserror::Error)]
pub enum AppendConditionFailed {
#[error("fencing token mismatch, expected: {0}")]
FencingTokenMismatch(FencingToken),
#[error("sequence number mismatch, expected: {0}")]
SeqNumMismatch(u64),
}
impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
fn from(value: api::stream::AppendConditionFailed) -> Self {
match value {
api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
}
api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
AppendConditionFailed::SeqNumMismatch(seq)
}
}
}
}
#[derive(Debug, Clone, thiserror::Error)]
pub enum S2Error {
#[error("{0}")]
Client(String),
#[error(transparent)]
Validation(#[from] ValidationError),
#[error("{0}")]
AppendConditionFailed(AppendConditionFailed),
#[error("read from an unwritten position. current tail: {0}")]
ReadUnwritten(StreamPosition),
#[error("{0}")]
Server(ErrorResponse),
}
impl From<ApiError> for S2Error {
fn from(err: ApiError) -> Self {
match err {
ApiError::ReadUnwritten(tail_response) => {
Self::ReadUnwritten(tail_response.tail.into())
}
ApiError::AppendConditionFailed(condition_failed) => {
Self::AppendConditionFailed(condition_failed.into())
}
ApiError::Server(_, response) => Self::Server(response.into()),
other => Self::Client(other.to_string()),
}
}
}
#[derive(Debug, Clone, thiserror::Error)]
#[error("{code}: {message}")]
#[non_exhaustive]
pub struct ErrorResponse {
pub code: String,
pub message: String,
}
impl From<ApiErrorResponse> for ErrorResponse {
fn from(response: ApiErrorResponse) -> Self {
Self {
code: response.code,
message: response.message,
}
}
}
fn idempotency_token() -> String {
uuid::Uuid::new_v4().simple().to_string()
}