use futures::StreamExt;
#[cfg(feature = "_hidden")]
use crate::client::Connect;
#[cfg(feature = "_hidden")]
use crate::types::{
CreateOrReconfigureBasinInput, CreateOrReconfigureStreamInput, CreateOrReconfigured,
};
use crate::{
api::{AccountClient, BaseClient, BasinClient},
producer::{Producer, ProducerConfig},
session::{self, AppendSession, AppendSessionConfig},
types::{
AccessTokenId, AccessTokenInfo, AppendAck, AppendInput, BasinConfig, BasinInfo, BasinName,
CreateBasinInput, CreateStreamInput, DeleteBasinInput, DeleteStreamInput,
GetAccountMetricsInput, GetBasinMetricsInput, GetStreamMetricsInput, IssueAccessTokenInput,
ListAccessTokensInput, ListAllAccessTokensInput, ListAllBasinsInput, ListAllStreamsInput,
ListBasinsInput, ListStreamsInput, Metric, Page, ReadBatch, ReadInput,
ReconfigureBasinInput, ReconfigureStreamInput, S2Config, S2Error, StreamConfig, StreamInfo,
StreamName, StreamPosition, Streaming,
},
};
#[derive(Debug, Clone)]
pub struct S2 {
client: AccountClient,
}
impl S2 {
pub fn new(config: S2Config) -> Result<Self, S2Error> {
let base_client = BaseClient::init(&config)?;
Ok(Self {
client: AccountClient::init(config, base_client),
})
}
#[doc(hidden)]
#[cfg(feature = "_hidden")]
pub fn new_with_connector<C>(config: S2Config, connector: C) -> Result<Self, S2Error>
where
C: Connect + Clone + Send + Sync + 'static,
{
let base_client = BaseClient::init_with_connector(&config, connector)?;
Ok(Self {
client: AccountClient::init(config, base_client),
})
}
pub fn basin(&self, name: BasinName) -> S2Basin {
S2Basin {
client: self.client.basin_client(name),
}
}
pub async fn list_basins(&self, input: ListBasinsInput) -> Result<Page<BasinInfo>, S2Error> {
let response = self.client.list_basins(input.into()).await?;
Ok(Page::new(
response
.basins
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>, _>>()?,
response.has_more,
))
}
pub fn list_all_basins(&self, input: ListAllBasinsInput) -> Streaming<BasinInfo> {
let s2 = self.clone();
let prefix = input.prefix;
let start_after = input.start_after;
let include_deleted = input.include_deleted;
let mut input = ListBasinsInput::new()
.with_prefix(prefix)
.with_start_after(start_after);
Box::pin(async_stream::try_stream! {
loop {
let page = s2.list_basins(input.clone()).await?;
let start_after = page.values.last().map(|info| info.name.clone().into());
for info in page.values {
if !include_deleted && info.deleted_at.is_some() {
continue;
}
yield info;
}
if page.has_more && let Some(start_after) = start_after {
input = input.with_start_after(start_after);
} else {
break;
}
}
})
}
pub async fn create_basin(&self, input: CreateBasinInput) -> Result<BasinInfo, S2Error> {
let (request, idempotency_token) = input.into();
let info = self.client.create_basin(request, idempotency_token).await?;
Ok(info.try_into()?)
}
#[doc(hidden)]
#[cfg(feature = "_hidden")]
pub async fn create_or_reconfigure_basin(
&self,
input: CreateOrReconfigureBasinInput,
) -> Result<CreateOrReconfigured<BasinInfo>, S2Error> {
let (name, request) = input.into();
let (was_created, info) = self
.client
.create_or_reconfigure_basin(name, request)
.await?;
let info = info.try_into()?;
Ok(if was_created {
CreateOrReconfigured::Created(info)
} else {
CreateOrReconfigured::Reconfigured(info)
})
}
pub async fn get_basin_config(&self, name: BasinName) -> Result<BasinConfig, S2Error> {
let config = self.client.get_basin_config(name).await?;
Ok(config.into())
}
pub async fn delete_basin(&self, input: DeleteBasinInput) -> Result<(), S2Error> {
Ok(self
.client
.delete_basin(input.name, input.ignore_not_found)
.await?)
}
pub async fn reconfigure_basin(
&self,
input: ReconfigureBasinInput,
) -> Result<BasinConfig, S2Error> {
let config = self
.client
.reconfigure_basin(input.name, input.config.into())
.await?;
Ok(config.into())
}
pub async fn list_access_tokens(
&self,
input: ListAccessTokensInput,
) -> Result<Page<AccessTokenInfo>, S2Error> {
let response = self.client.list_access_tokens(input.into()).await?;
Ok(Page::new(
response
.access_tokens
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>, _>>()?,
response.has_more,
))
}
pub fn list_all_access_tokens(
&self,
input: ListAllAccessTokensInput,
) -> Streaming<AccessTokenInfo> {
let s2 = self.clone();
let prefix = input.prefix;
let start_after = input.start_after;
let mut input = ListAccessTokensInput::new()
.with_prefix(prefix)
.with_start_after(start_after);
Box::pin(async_stream::try_stream! {
loop {
let page = s2.list_access_tokens(input.clone()).await?;
let start_after = page.values.last().map(|info| info.id.clone().into());
for info in page.values {
yield info;
}
if page.has_more && let Some(start_after) = start_after {
input = input.with_start_after(start_after);
} else {
break;
}
}
})
}
pub async fn issue_access_token(
&self,
input: IssueAccessTokenInput,
) -> Result<String, S2Error> {
let response = self.client.issue_access_token(input.into()).await?;
Ok(response.access_token)
}
pub async fn revoke_access_token(&self, id: AccessTokenId) -> Result<(), S2Error> {
Ok(self.client.revoke_access_token(id).await?)
}
pub async fn get_account_metrics(
&self,
input: GetAccountMetricsInput,
) -> Result<Vec<Metric>, S2Error> {
let response = self.client.get_account_metrics(input.into()).await?;
Ok(response.values.into_iter().map(Into::into).collect())
}
pub async fn get_basin_metrics(
&self,
input: GetBasinMetricsInput,
) -> Result<Vec<Metric>, S2Error> {
let (name, request) = input.into();
let response = self.client.get_basin_metrics(name, request).await?;
Ok(response.values.into_iter().map(Into::into).collect())
}
pub async fn get_stream_metrics(
&self,
input: GetStreamMetricsInput,
) -> Result<Vec<Metric>, S2Error> {
let (basin_name, stream_name, request) = input.into();
let response = self
.client
.get_stream_metrics(basin_name, stream_name, request)
.await?;
Ok(response.values.into_iter().map(Into::into).collect())
}
}
#[derive(Debug, Clone)]
pub struct S2Basin {
client: BasinClient,
}
impl S2Basin {
pub fn stream(&self, name: StreamName) -> S2Stream {
S2Stream {
client: self.client.clone(),
name,
}
}
pub async fn list_streams(&self, input: ListStreamsInput) -> Result<Page<StreamInfo>, S2Error> {
let response = self.client.list_streams(input.into()).await?;
Ok(Page::new(
response
.streams
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>, _>>()?,
response.has_more,
))
}
pub fn list_all_streams(&self, input: ListAllStreamsInput) -> Streaming<StreamInfo> {
let basin = self.clone();
let prefix = input.prefix;
let start_after = input.start_after;
let include_deleted = input.include_deleted;
let mut input = ListStreamsInput::new()
.with_prefix(prefix)
.with_start_after(start_after);
Box::pin(async_stream::try_stream! {
loop {
let page = basin.list_streams(input.clone()).await?;
let start_after = page.values.last().map(|info| info.name.clone().into());
for info in page.values {
if !include_deleted && info.deleted_at.is_some() {
continue;
}
yield info;
}
if page.has_more && let Some(start_after) = start_after {
input = input.with_start_after(start_after);
} else {
break;
}
}
})
}
pub async fn create_stream(&self, input: CreateStreamInput) -> Result<StreamInfo, S2Error> {
let (request, idempotency_token) = input.into();
let info = self
.client
.create_stream(request, idempotency_token)
.await?;
Ok(info.try_into()?)
}
#[doc(hidden)]
#[cfg(feature = "_hidden")]
pub async fn create_or_reconfigure_stream(
&self,
input: CreateOrReconfigureStreamInput,
) -> Result<CreateOrReconfigured<StreamInfo>, S2Error> {
let (name, config) = input.into();
let (was_created, info) = self
.client
.create_or_reconfigure_stream(name, config)
.await?;
let info = info.try_into()?;
Ok(if was_created {
CreateOrReconfigured::Created(info)
} else {
CreateOrReconfigured::Reconfigured(info)
})
}
pub async fn get_stream_config(&self, name: StreamName) -> Result<StreamConfig, S2Error> {
let config = self.client.get_stream_config(name).await?;
Ok(config.into())
}
pub async fn delete_stream(&self, input: DeleteStreamInput) -> Result<(), S2Error> {
Ok(self
.client
.delete_stream(input.name, input.ignore_not_found)
.await?)
}
pub async fn reconfigure_stream(
&self,
input: ReconfigureStreamInput,
) -> Result<StreamConfig, S2Error> {
let config = self
.client
.reconfigure_stream(input.name, input.config.into())
.await?;
Ok(config.into())
}
}
#[derive(Debug, Clone)]
pub struct S2Stream {
client: BasinClient,
name: StreamName,
}
impl S2Stream {
pub async fn check_tail(&self) -> Result<StreamPosition, S2Error> {
let response = self.client.check_tail(&self.name).await?;
Ok(response.tail.into())
}
pub async fn append(&self, input: AppendInput) -> Result<AppendAck, S2Error> {
let ack = self
.client
.append(
&self.name,
input.into(),
self.client.config.retry.append_retry_policy,
)
.await?;
Ok(ack.into())
}
pub async fn read(&self, input: ReadInput) -> Result<ReadBatch, S2Error> {
let batch = self
.client
.read(&self.name, input.start.into(), input.stop.into())
.await?;
Ok(ReadBatch::from_api(batch, input.ignore_command_records))
}
pub fn append_session(&self, config: AppendSessionConfig) -> AppendSession {
AppendSession::new(self.client.clone(), self.name.clone(), config)
}
pub fn producer(&self, config: ProducerConfig) -> Producer {
Producer::new(self.client.clone(), self.name.clone(), config)
}
pub async fn read_session(&self, input: ReadInput) -> Result<Streaming<ReadBatch>, S2Error> {
let batches = session::read_session(
self.client.clone(),
self.name.clone(),
input.start.into(),
input.stop.into(),
input.ignore_command_records,
)
.await?;
Ok(Box::pin(batches.map(|res| match res {
Ok(batch) => Ok(batch),
Err(err) => Err(err.into()),
})))
}
}