1use futures::StreamExt;
2
3#[cfg(feature = "_hidden")]
4use crate::client::Connect;
5#[cfg(feature = "_hidden")]
6use crate::types::{EnsureBasinInput, EnsureStreamInput, ProvisionResult};
7use crate::{
8 api::{AccountClient, BaseClient, BasinClient},
9 producer::{Producer, ProducerConfig},
10 session::{self, AppendSession, AppendSessionConfig},
11 types::{
12 AccessTokenId, AccessTokenInfo, AppendAck, AppendInput, BasinConfig, BasinInfo, BasinName,
13 CreateBasinInput, CreateStreamInput, DeleteBasinInput, DeleteStreamInput, EncryptionKey,
14 GetAccountMetricsInput, GetBasinMetricsInput, GetStreamMetricsInput, IssueAccessTokenInput,
15 ListAccessTokensInput, ListAllAccessTokensInput, ListAllBasinsInput, ListAllStreamsInput,
16 ListBasinsInput, ListStreamsInput, Metric, Page, ReadBatch, ReadInput,
17 ReconfigureBasinInput, ReconfigureStreamInput, S2Config, S2Error, StreamConfig, StreamInfo,
18 StreamName, StreamPosition, Streaming,
19 },
20};
21
22#[derive(Debug, Clone)]
23pub struct S2 {
25 client: AccountClient,
26}
27
28impl S2 {
29 pub fn new(config: S2Config) -> Result<Self, S2Error> {
31 let base_client = BaseClient::init(&config)?;
32 Ok(Self {
33 client: AccountClient::init(config, base_client),
34 })
35 }
36
37 #[doc(hidden)]
38 #[cfg(feature = "_hidden")]
39 pub fn new_with_connector<C>(config: S2Config, connector: C) -> Result<Self, S2Error>
40 where
41 C: Connect + Clone + Send + Sync + 'static,
42 {
43 let base_client = BaseClient::init_with_connector(&config, connector)?;
44 Ok(Self {
45 client: AccountClient::init(config, base_client),
46 })
47 }
48
49 pub fn basin(&self, name: BasinName) -> S2Basin {
51 S2Basin {
52 client: self.client.basin_client(name),
53 }
54 }
55
56 pub async fn list_basins(&self, input: ListBasinsInput) -> Result<Page<BasinInfo>, S2Error> {
60 let response = self.client.list_basins(input.into()).await?;
61 Ok(Page::new(
62 response
63 .basins
64 .into_iter()
65 .map(TryInto::try_into)
66 .collect::<Result<Vec<_>, _>>()?,
67 response.has_more,
68 ))
69 }
70
71 pub fn list_all_basins(&self, input: ListAllBasinsInput) -> Streaming<BasinInfo> {
73 let s2 = self.clone();
74 let prefix = input.prefix;
75 let start_after = input.start_after;
76 let include_deleted = input.include_deleted;
77 let mut input = ListBasinsInput::new()
78 .with_prefix(prefix)
79 .with_start_after(start_after);
80 Box::pin(async_stream::try_stream! {
81 loop {
82 let page = s2.list_basins(input.clone()).await?;
83 let start_after = page.values.last().map(|info| info.name.clone().into());
84
85 for info in page.values {
86 if !include_deleted && info.deleted_at.is_some() {
87 continue;
88 }
89 yield info;
90 }
91
92 if page.has_more && let Some(start_after) = start_after {
93 input = input.with_start_after(start_after);
94 } else {
95 break;
96 }
97 }
98 })
99 }
100
101 pub async fn create_basin(&self, input: CreateBasinInput) -> Result<BasinInfo, S2Error> {
103 let (request, idempotency_token) = input.into();
104 let info = self.client.create_basin(request, idempotency_token).await?;
105 Ok(info.try_into()?)
106 }
107
108 #[doc(hidden)]
118 #[cfg(feature = "_hidden")]
119 pub async fn ensure_basin(
120 &self,
121 input: EnsureBasinInput,
122 ) -> Result<ProvisionResult<BasinInfo>, S2Error> {
123 let (name, request) = input.into();
124 Ok(self
125 .client
126 .ensure_basin(name, request)
127 .await?
128 .try_map(BasinInfo::try_from)?)
129 }
130
131 pub async fn get_basin_config(&self, name: BasinName) -> Result<BasinConfig, S2Error> {
133 let config = self.client.get_basin_config(name).await?;
134 Ok(config.into())
135 }
136
137 pub async fn delete_basin(&self, input: DeleteBasinInput) -> Result<(), S2Error> {
139 Ok(self
140 .client
141 .delete_basin(input.name, input.ignore_not_found)
142 .await?)
143 }
144
145 pub async fn reconfigure_basin(
147 &self,
148 input: ReconfigureBasinInput,
149 ) -> Result<BasinConfig, S2Error> {
150 let config = self
151 .client
152 .reconfigure_basin(input.name, input.config.into())
153 .await?;
154 Ok(config.into())
155 }
156
157 pub async fn list_access_tokens(
161 &self,
162 input: ListAccessTokensInput,
163 ) -> Result<Page<AccessTokenInfo>, S2Error> {
164 let response = self.client.list_access_tokens(input.into()).await?;
165 Ok(Page::new(
166 response
167 .access_tokens
168 .into_iter()
169 .map(TryInto::try_into)
170 .collect::<Result<Vec<_>, _>>()?,
171 response.has_more,
172 ))
173 }
174
175 pub fn list_all_access_tokens(
177 &self,
178 input: ListAllAccessTokensInput,
179 ) -> Streaming<AccessTokenInfo> {
180 let s2 = self.clone();
181 let prefix = input.prefix;
182 let start_after = input.start_after;
183 let mut input = ListAccessTokensInput::new()
184 .with_prefix(prefix)
185 .with_start_after(start_after);
186 Box::pin(async_stream::try_stream! {
187 loop {
188 let page = s2.list_access_tokens(input.clone()).await?;
189
190 let start_after = page.values.last().map(|info| info.id.clone().into());
191 for info in page.values {
192 yield info;
193 }
194
195 if page.has_more && let Some(start_after) = start_after {
196 input = input.with_start_after(start_after);
197 } else {
198 break;
199 }
200 }
201 })
202 }
203
204 pub async fn issue_access_token(
206 &self,
207 input: IssueAccessTokenInput,
208 ) -> Result<String, S2Error> {
209 let response = self.client.issue_access_token(input.into()).await?;
210 Ok(response.access_token)
211 }
212
213 pub async fn revoke_access_token(&self, id: AccessTokenId) -> Result<(), S2Error> {
215 Ok(self.client.revoke_access_token(id).await?)
216 }
217
218 pub async fn get_account_metrics(
220 &self,
221 input: GetAccountMetricsInput,
222 ) -> Result<Vec<Metric>, S2Error> {
223 let response = self.client.get_account_metrics(input.into()).await?;
224 Ok(response.values.into_iter().map(Into::into).collect())
225 }
226
227 pub async fn get_basin_metrics(
229 &self,
230 input: GetBasinMetricsInput,
231 ) -> Result<Vec<Metric>, S2Error> {
232 let (name, request) = input.into();
233 let response = self.client.get_basin_metrics(name, request).await?;
234 Ok(response.values.into_iter().map(Into::into).collect())
235 }
236
237 pub async fn get_stream_metrics(
239 &self,
240 input: GetStreamMetricsInput,
241 ) -> Result<Vec<Metric>, S2Error> {
242 let (basin_name, stream_name, request) = input.into();
243 let response = self
244 .client
245 .get_stream_metrics(basin_name, stream_name, request)
246 .await?;
247 Ok(response.values.into_iter().map(Into::into).collect())
248 }
249}
250
251#[derive(Debug, Clone)]
252pub struct S2Basin {
256 client: BasinClient,
257}
258
259impl S2Basin {
260 pub fn stream(&self, name: StreamName) -> S2Stream {
262 S2Stream {
263 client: self.client.clone(),
264 name,
265 encryption: None,
266 }
267 }
268
269 pub async fn list_streams(&self, input: ListStreamsInput) -> Result<Page<StreamInfo>, S2Error> {
273 let response = self.client.list_streams(input.into()).await?;
274 Ok(Page::new(
275 response
276 .streams
277 .into_iter()
278 .map(TryInto::try_into)
279 .collect::<Result<Vec<_>, _>>()?,
280 response.has_more,
281 ))
282 }
283
284 pub fn list_all_streams(&self, input: ListAllStreamsInput) -> Streaming<StreamInfo> {
286 let basin = self.clone();
287 let prefix = input.prefix;
288 let start_after = input.start_after;
289 let include_deleted = input.include_deleted;
290 let mut input = ListStreamsInput::new()
291 .with_prefix(prefix)
292 .with_start_after(start_after);
293 Box::pin(async_stream::try_stream! {
294 loop {
295 let page = basin.list_streams(input.clone()).await?;
296 let start_after = page.values.last().map(|info| info.name.clone().into());
297
298 for info in page.values {
299 if !include_deleted && info.deleted_at.is_some() {
300 continue;
301 }
302 yield info;
303 }
304
305 if page.has_more && let Some(start_after) = start_after {
306 input = input.with_start_after(start_after);
307 } else {
308 break;
309 }
310 }
311 })
312 }
313
314 pub async fn create_stream(&self, input: CreateStreamInput) -> Result<StreamInfo, S2Error> {
316 let (request, idempotency_token) = input.into();
317 let info = self
318 .client
319 .create_stream(request, idempotency_token)
320 .await?;
321 Ok(info.try_into()?)
322 }
323
324 #[doc(hidden)]
334 #[cfg(feature = "_hidden")]
335 pub async fn ensure_stream(
336 &self,
337 input: EnsureStreamInput,
338 ) -> Result<ProvisionResult<StreamInfo>, S2Error> {
339 let (name, config) = input.into();
340 Ok(self
341 .client
342 .ensure_stream(name, config)
343 .await?
344 .try_map(StreamInfo::try_from)?)
345 }
346
347 pub async fn get_stream_config(&self, name: StreamName) -> Result<StreamConfig, S2Error> {
349 let config = self.client.get_stream_config(name).await?;
350 Ok(config.into())
351 }
352
353 pub async fn delete_stream(&self, input: DeleteStreamInput) -> Result<(), S2Error> {
355 Ok(self
356 .client
357 .delete_stream(input.name, input.ignore_not_found)
358 .await?)
359 }
360
361 pub async fn reconfigure_stream(
363 &self,
364 input: ReconfigureStreamInput,
365 ) -> Result<StreamConfig, S2Error> {
366 let config = self
367 .client
368 .reconfigure_stream(input.name, input.config.into())
369 .await?;
370 Ok(config.into())
371 }
372}
373
374#[derive(Debug, Clone)]
375pub struct S2Stream {
379 client: BasinClient,
380 name: StreamName,
381 encryption: Option<EncryptionKey>,
382}
383
384impl S2Stream {
385 pub fn with_encryption_key(self, encryption: EncryptionKey) -> Self {
387 Self {
388 encryption: Some(encryption),
389 ..self
390 }
391 }
392
393 pub async fn check_tail(&self) -> Result<StreamPosition, S2Error> {
395 let response = self.client.check_tail(&self.name).await?;
396 Ok(response.tail.into())
397 }
398
399 pub async fn append(&self, input: AppendInput) -> Result<AppendAck, S2Error> {
401 let ack = self
402 .client
403 .append(
404 &self.name,
405 input.into(),
406 self.encryption.as_ref(),
407 self.client.config.retry.append_retry_policy,
408 )
409 .await?;
410 Ok(ack.into())
411 }
412
413 pub async fn read(&self, input: ReadInput) -> Result<ReadBatch, S2Error> {
415 let batch = self
416 .client
417 .read(
418 &self.name,
419 input.start.into(),
420 input.stop.into(),
421 self.encryption.as_ref(),
422 )
423 .await?;
424 let mut batch = ReadBatch::from_api(batch);
425 if input.ignore_command_records {
426 batch.records.retain(|r| !r.is_command_record());
427 }
428 Ok(batch)
429 }
430
431 pub fn append_session(&self, config: AppendSessionConfig) -> AppendSession {
433 AppendSession::new(
434 self.client.clone(),
435 self.name.clone(),
436 self.encryption.clone(),
437 config,
438 )
439 }
440
441 pub fn producer(&self, config: ProducerConfig) -> Producer {
443 Producer::new(
444 self.client.clone(),
445 self.name.clone(),
446 self.encryption.clone(),
447 config,
448 )
449 }
450
451 pub async fn read_session(&self, input: ReadInput) -> Result<Streaming<ReadBatch>, S2Error> {
453 let batches = session::read_session(
454 self.client.clone(),
455 self.name.clone(),
456 self.encryption.clone(),
457 input.start.into(),
458 input.stop.into(),
459 input.ignore_command_records,
460 )
461 .await?;
462 Ok(Box::pin(batches.map(|res| match res {
463 Ok(batch) => Ok(batch),
464 Err(err) => Err(err.into()),
465 })))
466 }
467}