1use futures::StreamExt;
2
3#[cfg(feature = "_hidden")]
4use crate::client::Connect;
5#[cfg(feature = "_hidden")]
6use crate::types::{
7 CreateOrReconfigureBasinInput, CreateOrReconfigureStreamInput, CreateOrReconfigured,
8};
9use crate::{
10 api::{AccountClient, BaseClient, BasinClient},
11 producer::{Producer, ProducerConfig},
12 session::{self, AppendSession, AppendSessionConfig},
13 types::{
14 AccessTokenId, AccessTokenInfo, AppendAck, AppendInput, BasinConfig, BasinInfo, BasinName,
15 BasinState, CreateBasinInput, CreateStreamInput, DeleteBasinInput, DeleteStreamInput,
16 GetAccountMetricsInput, GetBasinMetricsInput, GetStreamMetricsInput, IssueAccessTokenInput,
17 ListAccessTokensInput, ListAllAccessTokensInput, ListAllBasinsInput, ListAllStreamsInput,
18 ListBasinsInput, ListStreamsInput, Metric, Page, ReadBatch, ReadInput,
19 ReconfigureBasinInput, ReconfigureStreamInput, S2Config, S2Error, StreamConfig, StreamInfo,
20 StreamName, StreamPosition, Streaming,
21 },
22};
23
24#[derive(Debug, Clone)]
25pub struct S2 {
27 client: AccountClient,
28}
29
30impl S2 {
31 pub fn new(config: S2Config) -> Result<Self, S2Error> {
33 let base_client = BaseClient::init(&config)?;
34 Ok(Self {
35 client: AccountClient::init(config, base_client),
36 })
37 }
38
39 #[doc(hidden)]
40 #[cfg(feature = "_hidden")]
41 pub fn new_with_connector<C>(config: S2Config, connector: C) -> Result<Self, S2Error>
42 where
43 C: Connect + Clone + Send + Sync + 'static,
44 {
45 let base_client = BaseClient::init_with_connector(&config, connector)?;
46 Ok(Self {
47 client: AccountClient::init(config, base_client),
48 })
49 }
50
51 pub fn basin(&self, name: BasinName) -> S2Basin {
53 S2Basin {
54 client: self.client.basin_client(name),
55 }
56 }
57
58 pub async fn list_basins(&self, input: ListBasinsInput) -> Result<Page<BasinInfo>, S2Error> {
62 let response = self.client.list_basins(input.into()).await?;
63 Ok(Page::new(
64 response
65 .basins
66 .into_iter()
67 .map(Into::into)
68 .collect::<Vec<_>>(),
69 response.has_more,
70 ))
71 }
72
73 pub fn list_all_basins(&self, input: ListAllBasinsInput) -> Streaming<BasinInfo> {
75 let s2 = self.clone();
76 let prefix = input.prefix;
77 let start_after = input.start_after;
78 let include_deleted = input.include_deleted;
79 let mut input = ListBasinsInput::new()
80 .with_prefix(prefix)
81 .with_start_after(start_after);
82 Box::pin(async_stream::try_stream! {
83 loop {
84 let page = s2.list_basins(input.clone()).await?;
85
86 let start_after = page.values.last().map(|info| info.name.clone().into());
87 for info in page.values {
88 if !include_deleted && info.state == BasinState::Deleting {
89 continue;
90 }
91 yield info;
92 }
93
94 if page.has_more && let Some(start_after) = start_after {
95 input = input.with_start_after(start_after);
96 } else {
97 break;
98 }
99 }
100 })
101 }
102
103 pub async fn create_basin(&self, input: CreateBasinInput) -> Result<BasinInfo, S2Error> {
105 let (request, idempotency_token) = input.into();
106 let info = self.client.create_basin(request, idempotency_token).await?;
107 Ok(info.into())
108 }
109
110 #[doc(hidden)]
118 #[cfg(feature = "_hidden")]
119 pub async fn create_or_reconfigure_basin(
120 &self,
121 input: CreateOrReconfigureBasinInput,
122 ) -> Result<CreateOrReconfigured<BasinInfo>, S2Error> {
123 let (name, request) = input.into();
124 let (was_created, info) = self
125 .client
126 .create_or_reconfigure_basin(name, request)
127 .await?;
128 let info = info.into();
129 Ok(if was_created {
130 CreateOrReconfigured::Created(info)
131 } else {
132 CreateOrReconfigured::Reconfigured(info)
133 })
134 }
135
136 pub async fn get_basin_config(&self, name: BasinName) -> Result<BasinConfig, S2Error> {
138 let config = self.client.get_basin_config(name).await?;
139 Ok(config.into())
140 }
141
142 pub async fn delete_basin(&self, input: DeleteBasinInput) -> Result<(), S2Error> {
144 Ok(self
145 .client
146 .delete_basin(input.name, input.ignore_not_found)
147 .await?)
148 }
149
150 pub async fn reconfigure_basin(
152 &self,
153 input: ReconfigureBasinInput,
154 ) -> Result<BasinConfig, S2Error> {
155 let config = self
156 .client
157 .reconfigure_basin(input.name, input.config.into())
158 .await?;
159 Ok(config.into())
160 }
161
162 pub async fn list_access_tokens(
166 &self,
167 input: ListAccessTokensInput,
168 ) -> Result<Page<AccessTokenInfo>, S2Error> {
169 let response = self.client.list_access_tokens(input.into()).await?;
170 Ok(Page::new(
171 response
172 .access_tokens
173 .into_iter()
174 .map(TryInto::try_into)
175 .collect::<Result<Vec<_>, _>>()?,
176 response.has_more,
177 ))
178 }
179
180 pub fn list_all_access_tokens(
182 &self,
183 input: ListAllAccessTokensInput,
184 ) -> Streaming<AccessTokenInfo> {
185 let s2 = self.clone();
186 let prefix = input.prefix;
187 let start_after = input.start_after;
188 let mut input = ListAccessTokensInput::new()
189 .with_prefix(prefix)
190 .with_start_after(start_after);
191 Box::pin(async_stream::try_stream! {
192 loop {
193 let page = s2.list_access_tokens(input.clone()).await?;
194
195 let start_after = page.values.last().map(|info| info.id.clone().into());
196 for info in page.values {
197 yield info;
198 }
199
200 if page.has_more && let Some(start_after) = start_after {
201 input = input.with_start_after(start_after);
202 } else {
203 break;
204 }
205 }
206 })
207 }
208
209 pub async fn issue_access_token(
211 &self,
212 input: IssueAccessTokenInput,
213 ) -> Result<String, S2Error> {
214 let response = self.client.issue_access_token(input.into()).await?;
215 Ok(response.access_token)
216 }
217
218 pub async fn revoke_access_token(&self, id: AccessTokenId) -> Result<(), S2Error> {
220 Ok(self.client.revoke_access_token(id).await?)
221 }
222
223 pub async fn get_account_metrics(
225 &self,
226 input: GetAccountMetricsInput,
227 ) -> Result<Vec<Metric>, S2Error> {
228 let response = self.client.get_account_metrics(input.into()).await?;
229 Ok(response.values.into_iter().map(Into::into).collect())
230 }
231
232 pub async fn get_basin_metrics(
234 &self,
235 input: GetBasinMetricsInput,
236 ) -> Result<Vec<Metric>, S2Error> {
237 let (name, request) = input.into();
238 let response = self.client.get_basin_metrics(name, request).await?;
239 Ok(response.values.into_iter().map(Into::into).collect())
240 }
241
242 pub async fn get_stream_metrics(
244 &self,
245 input: GetStreamMetricsInput,
246 ) -> Result<Vec<Metric>, S2Error> {
247 let (basin_name, stream_name, request) = input.into();
248 let response = self
249 .client
250 .get_stream_metrics(basin_name, stream_name, request)
251 .await?;
252 Ok(response.values.into_iter().map(Into::into).collect())
253 }
254}
255
256#[derive(Debug, Clone)]
257pub struct S2Basin {
261 client: BasinClient,
262}
263
264impl S2Basin {
265 pub fn stream(&self, name: StreamName) -> S2Stream {
267 S2Stream {
268 client: self.client.clone(),
269 name,
270 }
271 }
272
273 pub async fn list_streams(&self, input: ListStreamsInput) -> Result<Page<StreamInfo>, S2Error> {
277 let response = self.client.list_streams(input.into()).await?;
278 Ok(Page::new(
279 response
280 .streams
281 .into_iter()
282 .map(TryInto::try_into)
283 .collect::<Result<Vec<_>, _>>()?,
284 response.has_more,
285 ))
286 }
287
288 pub fn list_all_streams(&self, input: ListAllStreamsInput) -> Streaming<StreamInfo> {
290 let basin = self.clone();
291 let prefix = input.prefix;
292 let start_after = input.start_after;
293 let include_deleted = input.include_deleted;
294 let mut input = ListStreamsInput::new()
295 .with_prefix(prefix)
296 .with_start_after(start_after);
297 Box::pin(async_stream::try_stream! {
298 loop {
299 let page = basin.list_streams(input.clone()).await?;
300
301 let start_after = page.values.last().map(|info| info.name.clone().into());
302 for info in page.values {
303 if !include_deleted && info.deleted_at.is_some() {
304 continue;
305 }
306 yield info;
307 }
308
309 if page.has_more && let Some(start_after) = start_after {
310 input = input.with_start_after(start_after);
311 } else {
312 break;
313 }
314 }
315 })
316 }
317
318 pub async fn create_stream(&self, input: CreateStreamInput) -> Result<StreamInfo, S2Error> {
320 let (request, idempotency_token) = input.into();
321 let info = self
322 .client
323 .create_stream(request, idempotency_token)
324 .await?;
325 Ok(info.try_into()?)
326 }
327
328 #[doc(hidden)]
336 #[cfg(feature = "_hidden")]
337 pub async fn create_or_reconfigure_stream(
338 &self,
339 input: CreateOrReconfigureStreamInput,
340 ) -> Result<CreateOrReconfigured<StreamInfo>, S2Error> {
341 let (name, config) = input.into();
342 let (was_created, info) = self
343 .client
344 .create_or_reconfigure_stream(name, config)
345 .await?;
346 let info = info.try_into()?;
347 Ok(if was_created {
348 CreateOrReconfigured::Created(info)
349 } else {
350 CreateOrReconfigured::Reconfigured(info)
351 })
352 }
353
354 pub async fn get_stream_config(&self, name: StreamName) -> Result<StreamConfig, S2Error> {
356 let config = self.client.get_stream_config(name).await?;
357 Ok(config.into())
358 }
359
360 pub async fn delete_stream(&self, input: DeleteStreamInput) -> Result<(), S2Error> {
362 Ok(self
363 .client
364 .delete_stream(input.name, input.ignore_not_found)
365 .await?)
366 }
367
368 pub async fn reconfigure_stream(
370 &self,
371 input: ReconfigureStreamInput,
372 ) -> Result<StreamConfig, S2Error> {
373 let config = self
374 .client
375 .reconfigure_stream(input.name, input.config.into())
376 .await?;
377 Ok(config.into())
378 }
379}
380
381#[derive(Debug, Clone)]
382pub struct S2Stream {
386 client: BasinClient,
387 name: StreamName,
388}
389
390impl S2Stream {
391 pub async fn check_tail(&self) -> Result<StreamPosition, S2Error> {
393 let response = self.client.check_tail(&self.name).await?;
394 Ok(response.tail.into())
395 }
396
397 pub async fn append(&self, input: AppendInput) -> Result<AppendAck, S2Error> {
399 let retry_enabled = self
400 .client
401 .config
402 .retry
403 .append_retry_policy
404 .is_compliant(&input);
405 let ack = self
406 .client
407 .append(&self.name, input.into(), retry_enabled)
408 .await?;
409 Ok(ack.into())
410 }
411
412 pub async fn read(&self, input: ReadInput) -> Result<ReadBatch, S2Error> {
414 let batch = self
415 .client
416 .read(&self.name, input.start.into(), input.stop.into())
417 .await?;
418 Ok(ReadBatch::from_api(batch, input.ignore_command_records))
419 }
420
421 pub fn append_session(&self, config: AppendSessionConfig) -> AppendSession {
423 AppendSession::new(self.client.clone(), self.name.clone(), config)
424 }
425
426 pub fn producer(&self, config: ProducerConfig) -> Producer {
428 Producer::new(self.client.clone(), self.name.clone(), config)
429 }
430
431 pub async fn read_session(&self, input: ReadInput) -> Result<Streaming<ReadBatch>, S2Error> {
433 let batches = session::read_session(
434 self.client.clone(),
435 self.name.clone(),
436 input.start.into(),
437 input.stop.into(),
438 input.ignore_command_records,
439 )
440 .await?;
441 Ok(Box::pin(batches.map(|res| match res {
442 Ok(batch) => Ok(batch),
443 Err(err) => Err(err.into()),
444 })))
445 }
446}