1use futures::StreamExt;
2
3#[cfg(feature = "_hidden")]
4use crate::client::Connect;
5#[cfg(feature = "_hidden")]
6use crate::types::{
7 CreateOrReconfigureBasinInput, CreateOrReconfigureStreamInput, CreateOrReconfigured,
8};
9use crate::{
10 api::{AccountClient, BaseClient, BasinClient},
11 producer::{Producer, ProducerConfig},
12 session::{self, AppendSession, AppendSessionConfig},
13 types::{
14 AccessTokenId, AccessTokenInfo, AppendAck, AppendInput, BasinConfig, BasinInfo, BasinName,
15 CreateBasinInput, CreateStreamInput, DeleteBasinInput, DeleteStreamInput, EncryptionKey,
16 GetAccountMetricsInput, GetBasinMetricsInput, GetStreamMetricsInput, IssueAccessTokenInput,
17 ListAccessTokensInput, ListAllAccessTokensInput, ListAllBasinsInput, ListAllStreamsInput,
18 ListBasinsInput, ListStreamsInput, Metric, Page, ReadBatch, ReadInput,
19 ReconfigureBasinInput, ReconfigureStreamInput, S2Config, S2Error, StreamConfig, StreamInfo,
20 StreamName, StreamPosition, Streaming,
21 },
22};
23
24#[derive(Debug, Clone)]
25pub struct S2 {
27 client: AccountClient,
28}
29
30impl S2 {
31 pub fn new(config: S2Config) -> Result<Self, S2Error> {
33 let base_client = BaseClient::init(&config)?;
34 Ok(Self {
35 client: AccountClient::init(config, base_client),
36 })
37 }
38
39 #[doc(hidden)]
40 #[cfg(feature = "_hidden")]
41 pub fn new_with_connector<C>(config: S2Config, connector: C) -> Result<Self, S2Error>
42 where
43 C: Connect + Clone + Send + Sync + 'static,
44 {
45 let base_client = BaseClient::init_with_connector(&config, connector)?;
46 Ok(Self {
47 client: AccountClient::init(config, base_client),
48 })
49 }
50
51 pub fn basin(&self, name: BasinName) -> S2Basin {
53 S2Basin {
54 client: self.client.basin_client(name),
55 }
56 }
57
58 pub async fn list_basins(&self, input: ListBasinsInput) -> Result<Page<BasinInfo>, S2Error> {
62 let response = self.client.list_basins(input.into()).await?;
63 Ok(Page::new(
64 response
65 .basins
66 .into_iter()
67 .map(TryInto::try_into)
68 .collect::<Result<Vec<_>, _>>()?,
69 response.has_more,
70 ))
71 }
72
73 pub fn list_all_basins(&self, input: ListAllBasinsInput) -> Streaming<BasinInfo> {
75 let s2 = self.clone();
76 let prefix = input.prefix;
77 let start_after = input.start_after;
78 let include_deleted = input.include_deleted;
79 let mut input = ListBasinsInput::new()
80 .with_prefix(prefix)
81 .with_start_after(start_after);
82 Box::pin(async_stream::try_stream! {
83 loop {
84 let page = s2.list_basins(input.clone()).await?;
85 let start_after = page.values.last().map(|info| info.name.clone().into());
86
87 for info in page.values {
88 if !include_deleted && info.deleted_at.is_some() {
89 continue;
90 }
91 yield info;
92 }
93
94 if page.has_more && let Some(start_after) = start_after {
95 input = input.with_start_after(start_after);
96 } else {
97 break;
98 }
99 }
100 })
101 }
102
103 pub async fn create_basin(&self, input: CreateBasinInput) -> Result<BasinInfo, S2Error> {
105 let (request, idempotency_token) = input.into();
106 let info = self.client.create_basin(request, idempotency_token).await?;
107 Ok(info.try_into()?)
108 }
109
110 #[doc(hidden)]
118 #[cfg(feature = "_hidden")]
119 pub async fn create_or_reconfigure_basin(
120 &self,
121 input: CreateOrReconfigureBasinInput,
122 ) -> Result<CreateOrReconfigured<BasinInfo>, S2Error> {
123 let (name, request) = input.into();
124 let (was_created, info) = self
125 .client
126 .create_or_reconfigure_basin(name, request)
127 .await?;
128 let info = info.try_into()?;
129 Ok(if was_created {
130 CreateOrReconfigured::Created(info)
131 } else {
132 CreateOrReconfigured::Reconfigured(info)
133 })
134 }
135
136 pub async fn get_basin_config(&self, name: BasinName) -> Result<BasinConfig, S2Error> {
138 let config = self.client.get_basin_config(name).await?;
139 Ok(config.into())
140 }
141
142 pub async fn delete_basin(&self, input: DeleteBasinInput) -> Result<(), S2Error> {
144 Ok(self
145 .client
146 .delete_basin(input.name, input.ignore_not_found)
147 .await?)
148 }
149
150 pub async fn reconfigure_basin(
152 &self,
153 input: ReconfigureBasinInput,
154 ) -> Result<BasinConfig, S2Error> {
155 let config = self
156 .client
157 .reconfigure_basin(input.name, input.config.into())
158 .await?;
159 Ok(config.into())
160 }
161
162 pub async fn list_access_tokens(
166 &self,
167 input: ListAccessTokensInput,
168 ) -> Result<Page<AccessTokenInfo>, S2Error> {
169 let response = self.client.list_access_tokens(input.into()).await?;
170 Ok(Page::new(
171 response
172 .access_tokens
173 .into_iter()
174 .map(TryInto::try_into)
175 .collect::<Result<Vec<_>, _>>()?,
176 response.has_more,
177 ))
178 }
179
180 pub fn list_all_access_tokens(
182 &self,
183 input: ListAllAccessTokensInput,
184 ) -> Streaming<AccessTokenInfo> {
185 let s2 = self.clone();
186 let prefix = input.prefix;
187 let start_after = input.start_after;
188 let mut input = ListAccessTokensInput::new()
189 .with_prefix(prefix)
190 .with_start_after(start_after);
191 Box::pin(async_stream::try_stream! {
192 loop {
193 let page = s2.list_access_tokens(input.clone()).await?;
194
195 let start_after = page.values.last().map(|info| info.id.clone().into());
196 for info in page.values {
197 yield info;
198 }
199
200 if page.has_more && let Some(start_after) = start_after {
201 input = input.with_start_after(start_after);
202 } else {
203 break;
204 }
205 }
206 })
207 }
208
209 pub async fn issue_access_token(
211 &self,
212 input: IssueAccessTokenInput,
213 ) -> Result<String, S2Error> {
214 let response = self.client.issue_access_token(input.into()).await?;
215 Ok(response.access_token)
216 }
217
218 pub async fn revoke_access_token(&self, id: AccessTokenId) -> Result<(), S2Error> {
220 Ok(self.client.revoke_access_token(id).await?)
221 }
222
223 pub async fn get_account_metrics(
225 &self,
226 input: GetAccountMetricsInput,
227 ) -> Result<Vec<Metric>, S2Error> {
228 let response = self.client.get_account_metrics(input.into()).await?;
229 Ok(response.values.into_iter().map(Into::into).collect())
230 }
231
232 pub async fn get_basin_metrics(
234 &self,
235 input: GetBasinMetricsInput,
236 ) -> Result<Vec<Metric>, S2Error> {
237 let (name, request) = input.into();
238 let response = self.client.get_basin_metrics(name, request).await?;
239 Ok(response.values.into_iter().map(Into::into).collect())
240 }
241
242 pub async fn get_stream_metrics(
244 &self,
245 input: GetStreamMetricsInput,
246 ) -> Result<Vec<Metric>, S2Error> {
247 let (basin_name, stream_name, request) = input.into();
248 let response = self
249 .client
250 .get_stream_metrics(basin_name, stream_name, request)
251 .await?;
252 Ok(response.values.into_iter().map(Into::into).collect())
253 }
254}
255
256#[derive(Debug, Clone)]
257pub struct S2Basin {
261 client: BasinClient,
262}
263
264impl S2Basin {
265 pub fn stream(&self, name: StreamName) -> S2Stream {
267 S2Stream {
268 client: self.client.clone(),
269 name,
270 encryption: None,
271 }
272 }
273
274 pub async fn list_streams(&self, input: ListStreamsInput) -> Result<Page<StreamInfo>, S2Error> {
278 let response = self.client.list_streams(input.into()).await?;
279 Ok(Page::new(
280 response
281 .streams
282 .into_iter()
283 .map(TryInto::try_into)
284 .collect::<Result<Vec<_>, _>>()?,
285 response.has_more,
286 ))
287 }
288
289 pub fn list_all_streams(&self, input: ListAllStreamsInput) -> Streaming<StreamInfo> {
291 let basin = self.clone();
292 let prefix = input.prefix;
293 let start_after = input.start_after;
294 let include_deleted = input.include_deleted;
295 let mut input = ListStreamsInput::new()
296 .with_prefix(prefix)
297 .with_start_after(start_after);
298 Box::pin(async_stream::try_stream! {
299 loop {
300 let page = basin.list_streams(input.clone()).await?;
301 let start_after = page.values.last().map(|info| info.name.clone().into());
302
303 for info in page.values {
304 if !include_deleted && info.deleted_at.is_some() {
305 continue;
306 }
307 yield info;
308 }
309
310 if page.has_more && let Some(start_after) = start_after {
311 input = input.with_start_after(start_after);
312 } else {
313 break;
314 }
315 }
316 })
317 }
318
319 pub async fn create_stream(&self, input: CreateStreamInput) -> Result<StreamInfo, S2Error> {
321 let (request, idempotency_token) = input.into();
322 let info = self
323 .client
324 .create_stream(request, idempotency_token)
325 .await?;
326 Ok(info.try_into()?)
327 }
328
329 #[doc(hidden)]
337 #[cfg(feature = "_hidden")]
338 pub async fn create_or_reconfigure_stream(
339 &self,
340 input: CreateOrReconfigureStreamInput,
341 ) -> Result<CreateOrReconfigured<StreamInfo>, S2Error> {
342 let (name, config) = input.into();
343 let (was_created, info) = self
344 .client
345 .create_or_reconfigure_stream(name, config)
346 .await?;
347 let info = info.try_into()?;
348 Ok(if was_created {
349 CreateOrReconfigured::Created(info)
350 } else {
351 CreateOrReconfigured::Reconfigured(info)
352 })
353 }
354
355 pub async fn get_stream_config(&self, name: StreamName) -> Result<StreamConfig, S2Error> {
357 let config = self.client.get_stream_config(name).await?;
358 Ok(config.into())
359 }
360
361 pub async fn delete_stream(&self, input: DeleteStreamInput) -> Result<(), S2Error> {
363 Ok(self
364 .client
365 .delete_stream(input.name, input.ignore_not_found)
366 .await?)
367 }
368
369 pub async fn reconfigure_stream(
371 &self,
372 input: ReconfigureStreamInput,
373 ) -> Result<StreamConfig, S2Error> {
374 let config = self
375 .client
376 .reconfigure_stream(input.name, input.config.into())
377 .await?;
378 Ok(config.into())
379 }
380}
381
382#[derive(Debug, Clone)]
383pub struct S2Stream {
387 client: BasinClient,
388 name: StreamName,
389 encryption: Option<EncryptionKey>,
390}
391
392impl S2Stream {
393 pub fn with_encryption_key(self, encryption: EncryptionKey) -> Self {
395 Self {
396 encryption: Some(encryption),
397 ..self
398 }
399 }
400
401 pub async fn check_tail(&self) -> Result<StreamPosition, S2Error> {
403 let response = self.client.check_tail(&self.name).await?;
404 Ok(response.tail.into())
405 }
406
407 pub async fn append(&self, input: AppendInput) -> Result<AppendAck, S2Error> {
409 let ack = self
410 .client
411 .append(
412 &self.name,
413 input.into(),
414 self.encryption.as_ref(),
415 self.client.config.retry.append_retry_policy,
416 )
417 .await?;
418 Ok(ack.into())
419 }
420
421 pub async fn read(&self, input: ReadInput) -> Result<ReadBatch, S2Error> {
423 let batch = self
424 .client
425 .read(
426 &self.name,
427 input.start.into(),
428 input.stop.into(),
429 self.encryption.as_ref(),
430 )
431 .await?;
432 let mut batch = ReadBatch::from_api(batch);
433 if input.ignore_command_records {
434 batch.records.retain(|r| !r.is_command_record());
435 }
436 Ok(batch)
437 }
438
439 pub fn append_session(&self, config: AppendSessionConfig) -> AppendSession {
441 AppendSession::new(
442 self.client.clone(),
443 self.name.clone(),
444 self.encryption.clone(),
445 config,
446 )
447 }
448
449 pub fn producer(&self, config: ProducerConfig) -> Producer {
451 Producer::new(
452 self.client.clone(),
453 self.name.clone(),
454 self.encryption.clone(),
455 config,
456 )
457 }
458
459 pub async fn read_session(&self, input: ReadInput) -> Result<Streaming<ReadBatch>, S2Error> {
461 let batches = session::read_session(
462 self.client.clone(),
463 self.name.clone(),
464 self.encryption.clone(),
465 input.start.into(),
466 input.stop.into(),
467 input.ignore_command_records,
468 )
469 .await?;
470 Ok(Box::pin(batches.map(|res| match res {
471 Ok(batch) => Ok(batch),
472 Err(err) => Err(err.into()),
473 })))
474 }
475}