1use futures::StreamExt;
2
3#[cfg(feature = "_hidden")]
4use crate::client::Connect;
5use crate::{
6 api::{AccountClient, BaseClient, BasinClient},
7 producer::{Producer, ProducerConfig},
8 session::{self, AppendSession, AppendSessionConfig},
9 types::{
10 AccessTokenId, AccessTokenInfo, AppendAck, AppendInput, BasinConfig, BasinInfo, BasinName,
11 CreateBasinInput, CreateStreamInput, DeleteBasinInput, DeleteStreamInput, EncryptionKey,
12 EnsureBasinInput, EnsureOutput, EnsureStreamInput, GetAccountMetricsInput,
13 GetBasinMetricsInput, GetStreamMetricsInput, IssueAccessTokenInput, ListAccessTokensInput,
14 ListAllAccessTokensInput, ListAllBasinsInput, ListAllStreamsInput, ListBasinsInput,
15 ListStreamsInput, LocationInfo, LocationName, Metric, Page, ReadBatch, ReadInput,
16 ReconfigureBasinInput, ReconfigureStreamInput, S2Config, S2Error, StreamConfig, StreamInfo,
17 StreamName, StreamPosition, Streaming,
18 },
19};
20
21#[derive(Debug, Clone)]
22pub struct S2 {
24 client: AccountClient,
25}
26
27impl S2 {
28 pub fn new(config: S2Config) -> Result<Self, S2Error> {
30 let base_client = BaseClient::init(&config)?;
31 Ok(Self {
32 client: AccountClient::init(config, base_client),
33 })
34 }
35
36 #[doc(hidden)]
37 #[cfg(feature = "_hidden")]
38 pub fn new_with_connector<C>(config: S2Config, connector: C) -> Result<Self, S2Error>
39 where
40 C: Connect + Clone + Send + Sync + 'static,
41 {
42 let base_client = BaseClient::init_with_connector(&config, connector)?;
43 Ok(Self {
44 client: AccountClient::init(config, base_client),
45 })
46 }
47
48 pub fn basin(&self, name: BasinName) -> S2Basin {
50 S2Basin {
51 client: self.client.basin_client(name),
52 }
53 }
54
55 pub async fn list_basins(&self, input: ListBasinsInput) -> Result<Page<BasinInfo>, S2Error> {
59 let response = self.client.list_basins(input.into()).await?;
60 Ok(Page::new(
61 response
62 .basins
63 .into_iter()
64 .map(TryInto::try_into)
65 .collect::<Result<Vec<_>, _>>()?,
66 response.has_more,
67 ))
68 }
69
70 pub fn list_all_basins(&self, input: ListAllBasinsInput) -> Streaming<BasinInfo> {
72 let s2 = self.clone();
73 let prefix = input.prefix;
74 let start_after = input.start_after;
75 let include_deleted = input.include_deleted;
76 let mut input = ListBasinsInput::new()
77 .with_prefix(prefix)
78 .with_start_after(start_after);
79 Box::pin(async_stream::try_stream! {
80 loop {
81 let page = s2.list_basins(input.clone()).await?;
82 let start_after = page.values.last().map(|info| info.name.clone().into());
83
84 for info in page.values {
85 if !include_deleted && info.deleted_at.is_some() {
86 continue;
87 }
88 yield info;
89 }
90
91 if page.has_more && let Some(start_after) = start_after {
92 input = input.with_start_after(start_after);
93 } else {
94 break;
95 }
96 }
97 })
98 }
99
100 pub async fn create_basin(&self, input: CreateBasinInput) -> Result<BasinInfo, S2Error> {
102 let (request, idempotency_token) = input.into();
103 let info = self.client.create_basin(request, idempotency_token).await?;
104 Ok(info.try_into()?)
105 }
106
107 pub async fn ensure_basin(
115 &self,
116 input: EnsureBasinInput,
117 ) -> Result<EnsureOutput<BasinInfo>, S2Error> {
118 let (name, request) = input.into();
119 Ok(self
120 .client
121 .ensure_basin(name, request)
122 .await?
123 .try_map(BasinInfo::try_from)?
124 .into())
125 }
126
127 pub async fn get_basin_config(&self, name: BasinName) -> Result<BasinConfig, S2Error> {
129 let config = self.client.get_basin_config(name).await?;
130 Ok(config.into())
131 }
132
133 pub async fn delete_basin(&self, input: DeleteBasinInput) -> Result<(), S2Error> {
135 Ok(self
136 .client
137 .delete_basin(input.name, input.ignore_not_found)
138 .await?)
139 }
140
141 pub async fn reconfigure_basin(
143 &self,
144 input: ReconfigureBasinInput,
145 ) -> Result<BasinConfig, S2Error> {
146 let config = self
147 .client
148 .reconfigure_basin(input.name, input.config.into())
149 .await?;
150 Ok(config.into())
151 }
152
153 pub async fn list_access_tokens(
157 &self,
158 input: ListAccessTokensInput,
159 ) -> Result<Page<AccessTokenInfo>, S2Error> {
160 let response = self.client.list_access_tokens(input.into()).await?;
161 Ok(Page::new(
162 response
163 .access_tokens
164 .into_iter()
165 .map(TryInto::try_into)
166 .collect::<Result<Vec<_>, _>>()?,
167 response.has_more,
168 ))
169 }
170
171 pub fn list_all_access_tokens(
173 &self,
174 input: ListAllAccessTokensInput,
175 ) -> Streaming<AccessTokenInfo> {
176 let s2 = self.clone();
177 let prefix = input.prefix;
178 let start_after = input.start_after;
179 let mut input = ListAccessTokensInput::new()
180 .with_prefix(prefix)
181 .with_start_after(start_after);
182 Box::pin(async_stream::try_stream! {
183 loop {
184 let page = s2.list_access_tokens(input.clone()).await?;
185
186 let start_after = page.values.last().map(|info| info.id.clone().into());
187 for info in page.values {
188 yield info;
189 }
190
191 if page.has_more && let Some(start_after) = start_after {
192 input = input.with_start_after(start_after);
193 } else {
194 break;
195 }
196 }
197 })
198 }
199
200 pub async fn issue_access_token(
202 &self,
203 input: IssueAccessTokenInput,
204 ) -> Result<String, S2Error> {
205 let response = self.client.issue_access_token(input.into()).await?;
206 Ok(response.access_token)
207 }
208
209 pub async fn revoke_access_token(&self, id: AccessTokenId) -> Result<(), S2Error> {
211 Ok(self.client.revoke_access_token(id).await?)
212 }
213
214 pub async fn list_locations(&self) -> Result<Vec<LocationInfo>, S2Error> {
216 let response = self.client.list_locations().await?;
217 Ok(response.into_iter().map(Into::into).collect())
218 }
219
220 pub async fn get_default_location(&self) -> Result<LocationInfo, S2Error> {
222 Ok(self.client.get_default_location().await?.into())
223 }
224
225 pub async fn set_default_location(
227 &self,
228 location: LocationName,
229 ) -> Result<LocationInfo, S2Error> {
230 Ok(self.client.set_default_location(location).await?.into())
231 }
232
233 pub async fn get_account_metrics(
235 &self,
236 input: GetAccountMetricsInput,
237 ) -> Result<Vec<Metric>, S2Error> {
238 let response = self.client.get_account_metrics(input.into()).await?;
239 Ok(response.values.into_iter().map(Into::into).collect())
240 }
241
242 pub async fn get_basin_metrics(
244 &self,
245 input: GetBasinMetricsInput,
246 ) -> Result<Vec<Metric>, S2Error> {
247 let (name, request) = input.into();
248 let response = self.client.get_basin_metrics(name, request).await?;
249 Ok(response.values.into_iter().map(Into::into).collect())
250 }
251
252 pub async fn get_stream_metrics(
254 &self,
255 input: GetStreamMetricsInput,
256 ) -> Result<Vec<Metric>, S2Error> {
257 let (basin_name, stream_name, request) = input.into();
258 let response = self
259 .client
260 .get_stream_metrics(basin_name, stream_name, request)
261 .await?;
262 Ok(response.values.into_iter().map(Into::into).collect())
263 }
264}
265
266#[derive(Debug, Clone)]
267pub struct S2Basin {
271 client: BasinClient,
272}
273
274impl S2Basin {
275 pub fn stream(&self, name: StreamName) -> S2Stream {
277 S2Stream {
278 client: self.client.clone(),
279 name,
280 encryption: None,
281 }
282 }
283
284 pub async fn list_streams(&self, input: ListStreamsInput) -> Result<Page<StreamInfo>, S2Error> {
288 let response = self.client.list_streams(input.into()).await?;
289 Ok(Page::new(
290 response
291 .streams
292 .into_iter()
293 .map(TryInto::try_into)
294 .collect::<Result<Vec<_>, _>>()?,
295 response.has_more,
296 ))
297 }
298
299 pub fn list_all_streams(&self, input: ListAllStreamsInput) -> Streaming<StreamInfo> {
301 let basin = self.clone();
302 let prefix = input.prefix;
303 let start_after = input.start_after;
304 let include_deleted = input.include_deleted;
305 let mut input = ListStreamsInput::new()
306 .with_prefix(prefix)
307 .with_start_after(start_after);
308 Box::pin(async_stream::try_stream! {
309 loop {
310 let page = basin.list_streams(input.clone()).await?;
311 let start_after = page.values.last().map(|info| info.name.clone().into());
312
313 for info in page.values {
314 if !include_deleted && info.deleted_at.is_some() {
315 continue;
316 }
317 yield info;
318 }
319
320 if page.has_more && let Some(start_after) = start_after {
321 input = input.with_start_after(start_after);
322 } else {
323 break;
324 }
325 }
326 })
327 }
328
329 pub async fn create_stream(&self, input: CreateStreamInput) -> Result<StreamInfo, S2Error> {
331 let (request, idempotency_token) = input.into();
332 let info = self
333 .client
334 .create_stream(request, idempotency_token)
335 .await?;
336 Ok(info.try_into()?)
337 }
338
339 pub async fn ensure_stream(
347 &self,
348 input: EnsureStreamInput,
349 ) -> Result<EnsureOutput<StreamInfo>, S2Error> {
350 let (name, config) = input.into();
351 Ok(self
352 .client
353 .ensure_stream(name, config)
354 .await?
355 .try_map(StreamInfo::try_from)?
356 .into())
357 }
358
359 pub async fn get_stream_config(&self, name: StreamName) -> Result<StreamConfig, S2Error> {
361 let config = self.client.get_stream_config(name).await?;
362 Ok(config.into())
363 }
364
365 pub async fn delete_stream(&self, input: DeleteStreamInput) -> Result<(), S2Error> {
367 Ok(self
368 .client
369 .delete_stream(input.name, input.ignore_not_found)
370 .await?)
371 }
372
373 pub async fn reconfigure_stream(
375 &self,
376 input: ReconfigureStreamInput,
377 ) -> Result<StreamConfig, S2Error> {
378 let config = self
379 .client
380 .reconfigure_stream(input.name, input.config.into())
381 .await?;
382 Ok(config.into())
383 }
384}
385
386#[derive(Debug, Clone)]
387pub struct S2Stream {
391 client: BasinClient,
392 name: StreamName,
393 encryption: Option<EncryptionKey>,
394}
395
396impl S2Stream {
397 pub fn with_encryption_key(self, encryption: EncryptionKey) -> Self {
399 Self {
400 encryption: Some(encryption),
401 ..self
402 }
403 }
404
405 pub async fn check_tail(&self) -> Result<StreamPosition, S2Error> {
407 let response = self.client.check_tail(&self.name).await?;
408 Ok(response.tail.into())
409 }
410
411 pub async fn append(&self, input: AppendInput) -> Result<AppendAck, S2Error> {
413 let ack = self
414 .client
415 .append(
416 &self.name,
417 input.into(),
418 self.encryption.as_ref(),
419 self.client.config.retry.append_retry_policy,
420 )
421 .await?;
422 Ok(ack.into())
423 }
424
425 pub async fn read(&self, input: ReadInput) -> Result<ReadBatch, S2Error> {
427 let batch = self
428 .client
429 .read(
430 &self.name,
431 input.start.into(),
432 input.stop.into(),
433 self.encryption.as_ref(),
434 )
435 .await?;
436 let mut batch = ReadBatch::from_api(batch);
437 if input.ignore_command_records {
438 batch.records.retain(|r| !r.is_command_record());
439 }
440 Ok(batch)
441 }
442
443 pub fn append_session(&self, config: AppendSessionConfig) -> AppendSession {
445 AppendSession::new(
446 self.client.clone(),
447 self.name.clone(),
448 self.encryption.clone(),
449 config,
450 )
451 }
452
453 pub fn producer(&self, config: ProducerConfig) -> Producer {
455 Producer::new(
456 self.client.clone(),
457 self.name.clone(),
458 self.encryption.clone(),
459 config,
460 )
461 }
462
463 pub async fn read_session(&self, input: ReadInput) -> Result<Streaming<ReadBatch>, S2Error> {
465 let batches = session::read_session(
466 self.client.clone(),
467 self.name.clone(),
468 self.encryption.clone(),
469 input.start.into(),
470 input.stop.into(),
471 input.ignore_command_records,
472 )
473 .await?;
474 Ok(Box::pin(batches.map(|res| match res {
475 Ok(batch) => Ok(batch),
476 Err(err) => Err(err.into()),
477 })))
478 }
479}