1use futures::StreamExt;
2
3#[cfg(feature = "_hidden")]
4use crate::client::Connect;
5use crate::{
6 api::{AccountClient, BaseClient, BasinClient},
7 producer::{Producer, ProducerConfig},
8 session::{self, AppendSession, AppendSessionConfig},
9 types::{
10 AccessTokenId, AccessTokenInfo, AppendAck, AppendInput, BasinConfig, BasinInfo, BasinName,
11 CreateBasinInput, CreateStreamInput, DeleteBasinInput, DeleteStreamInput, EncryptionKey,
12 EnsureBasinInput, EnsureOutput, EnsureStreamInput, GetAccountMetricsInput,
13 GetBasinMetricsInput, GetStreamMetricsInput, IssueAccessTokenInput, ListAccessTokensInput,
14 ListAllAccessTokensInput, ListAllBasinsInput, ListAllStreamsInput, ListBasinsInput,
15 ListStreamsInput, Metric, Page, ReadBatch, ReadInput, ReconfigureBasinInput,
16 ReconfigureStreamInput, S2Config, S2Error, StreamConfig, StreamInfo, StreamName,
17 StreamPosition, Streaming,
18 },
19};
20
21#[derive(Debug, Clone)]
22pub struct S2 {
24 client: AccountClient,
25}
26
27impl S2 {
28 pub fn new(config: S2Config) -> Result<Self, S2Error> {
30 let base_client = BaseClient::init(&config)?;
31 Ok(Self {
32 client: AccountClient::init(config, base_client),
33 })
34 }
35
36 #[doc(hidden)]
37 #[cfg(feature = "_hidden")]
38 pub fn new_with_connector<C>(config: S2Config, connector: C) -> Result<Self, S2Error>
39 where
40 C: Connect + Clone + Send + Sync + 'static,
41 {
42 let base_client = BaseClient::init_with_connector(&config, connector)?;
43 Ok(Self {
44 client: AccountClient::init(config, base_client),
45 })
46 }
47
48 pub fn basin(&self, name: BasinName) -> S2Basin {
50 S2Basin {
51 client: self.client.basin_client(name),
52 }
53 }
54
55 pub async fn list_basins(&self, input: ListBasinsInput) -> Result<Page<BasinInfo>, S2Error> {
59 let response = self.client.list_basins(input.into()).await?;
60 Ok(Page::new(
61 response
62 .basins
63 .into_iter()
64 .map(TryInto::try_into)
65 .collect::<Result<Vec<_>, _>>()?,
66 response.has_more,
67 ))
68 }
69
70 pub fn list_all_basins(&self, input: ListAllBasinsInput) -> Streaming<BasinInfo> {
72 let s2 = self.clone();
73 let prefix = input.prefix;
74 let start_after = input.start_after;
75 let include_deleted = input.include_deleted;
76 let mut input = ListBasinsInput::new()
77 .with_prefix(prefix)
78 .with_start_after(start_after);
79 Box::pin(async_stream::try_stream! {
80 loop {
81 let page = s2.list_basins(input.clone()).await?;
82 let start_after = page.values.last().map(|info| info.name.clone().into());
83
84 for info in page.values {
85 if !include_deleted && info.deleted_at.is_some() {
86 continue;
87 }
88 yield info;
89 }
90
91 if page.has_more && let Some(start_after) = start_after {
92 input = input.with_start_after(start_after);
93 } else {
94 break;
95 }
96 }
97 })
98 }
99
100 pub async fn create_basin(&self, input: CreateBasinInput) -> Result<BasinInfo, S2Error> {
102 let (request, idempotency_token) = input.into();
103 let info = self.client.create_basin(request, idempotency_token).await?;
104 Ok(info.try_into()?)
105 }
106
107 pub async fn ensure_basin(
115 &self,
116 input: EnsureBasinInput,
117 ) -> Result<EnsureOutput<BasinInfo>, S2Error> {
118 let (name, request) = input.into();
119 Ok(self
120 .client
121 .ensure_basin(name, request)
122 .await?
123 .try_map(BasinInfo::try_from)?
124 .into())
125 }
126
127 pub async fn get_basin_config(&self, name: BasinName) -> Result<BasinConfig, S2Error> {
129 let config = self.client.get_basin_config(name).await?;
130 Ok(config.into())
131 }
132
133 pub async fn delete_basin(&self, input: DeleteBasinInput) -> Result<(), S2Error> {
135 Ok(self
136 .client
137 .delete_basin(input.name, input.ignore_not_found)
138 .await?)
139 }
140
141 pub async fn reconfigure_basin(
143 &self,
144 input: ReconfigureBasinInput,
145 ) -> Result<BasinConfig, S2Error> {
146 let config = self
147 .client
148 .reconfigure_basin(input.name, input.config.into())
149 .await?;
150 Ok(config.into())
151 }
152
153 pub async fn list_access_tokens(
157 &self,
158 input: ListAccessTokensInput,
159 ) -> Result<Page<AccessTokenInfo>, S2Error> {
160 let response = self.client.list_access_tokens(input.into()).await?;
161 Ok(Page::new(
162 response
163 .access_tokens
164 .into_iter()
165 .map(TryInto::try_into)
166 .collect::<Result<Vec<_>, _>>()?,
167 response.has_more,
168 ))
169 }
170
171 pub fn list_all_access_tokens(
173 &self,
174 input: ListAllAccessTokensInput,
175 ) -> Streaming<AccessTokenInfo> {
176 let s2 = self.clone();
177 let prefix = input.prefix;
178 let start_after = input.start_after;
179 let mut input = ListAccessTokensInput::new()
180 .with_prefix(prefix)
181 .with_start_after(start_after);
182 Box::pin(async_stream::try_stream! {
183 loop {
184 let page = s2.list_access_tokens(input.clone()).await?;
185
186 let start_after = page.values.last().map(|info| info.id.clone().into());
187 for info in page.values {
188 yield info;
189 }
190
191 if page.has_more && let Some(start_after) = start_after {
192 input = input.with_start_after(start_after);
193 } else {
194 break;
195 }
196 }
197 })
198 }
199
200 pub async fn issue_access_token(
202 &self,
203 input: IssueAccessTokenInput,
204 ) -> Result<String, S2Error> {
205 let response = self.client.issue_access_token(input.into()).await?;
206 Ok(response.access_token)
207 }
208
209 pub async fn revoke_access_token(&self, id: AccessTokenId) -> Result<(), S2Error> {
211 Ok(self.client.revoke_access_token(id).await?)
212 }
213
214 pub async fn get_account_metrics(
216 &self,
217 input: GetAccountMetricsInput,
218 ) -> Result<Vec<Metric>, S2Error> {
219 let response = self.client.get_account_metrics(input.into()).await?;
220 Ok(response.values.into_iter().map(Into::into).collect())
221 }
222
223 pub async fn get_basin_metrics(
225 &self,
226 input: GetBasinMetricsInput,
227 ) -> Result<Vec<Metric>, S2Error> {
228 let (name, request) = input.into();
229 let response = self.client.get_basin_metrics(name, request).await?;
230 Ok(response.values.into_iter().map(Into::into).collect())
231 }
232
233 pub async fn get_stream_metrics(
235 &self,
236 input: GetStreamMetricsInput,
237 ) -> Result<Vec<Metric>, S2Error> {
238 let (basin_name, stream_name, request) = input.into();
239 let response = self
240 .client
241 .get_stream_metrics(basin_name, stream_name, request)
242 .await?;
243 Ok(response.values.into_iter().map(Into::into).collect())
244 }
245}
246
247#[derive(Debug, Clone)]
248pub struct S2Basin {
252 client: BasinClient,
253}
254
255impl S2Basin {
256 pub fn stream(&self, name: StreamName) -> S2Stream {
258 S2Stream {
259 client: self.client.clone(),
260 name,
261 encryption: None,
262 }
263 }
264
265 pub async fn list_streams(&self, input: ListStreamsInput) -> Result<Page<StreamInfo>, S2Error> {
269 let response = self.client.list_streams(input.into()).await?;
270 Ok(Page::new(
271 response
272 .streams
273 .into_iter()
274 .map(TryInto::try_into)
275 .collect::<Result<Vec<_>, _>>()?,
276 response.has_more,
277 ))
278 }
279
280 pub fn list_all_streams(&self, input: ListAllStreamsInput) -> Streaming<StreamInfo> {
282 let basin = self.clone();
283 let prefix = input.prefix;
284 let start_after = input.start_after;
285 let include_deleted = input.include_deleted;
286 let mut input = ListStreamsInput::new()
287 .with_prefix(prefix)
288 .with_start_after(start_after);
289 Box::pin(async_stream::try_stream! {
290 loop {
291 let page = basin.list_streams(input.clone()).await?;
292 let start_after = page.values.last().map(|info| info.name.clone().into());
293
294 for info in page.values {
295 if !include_deleted && info.deleted_at.is_some() {
296 continue;
297 }
298 yield info;
299 }
300
301 if page.has_more && let Some(start_after) = start_after {
302 input = input.with_start_after(start_after);
303 } else {
304 break;
305 }
306 }
307 })
308 }
309
310 pub async fn create_stream(&self, input: CreateStreamInput) -> Result<StreamInfo, S2Error> {
312 let (request, idempotency_token) = input.into();
313 let info = self
314 .client
315 .create_stream(request, idempotency_token)
316 .await?;
317 Ok(info.try_into()?)
318 }
319
320 pub async fn ensure_stream(
328 &self,
329 input: EnsureStreamInput,
330 ) -> Result<EnsureOutput<StreamInfo>, S2Error> {
331 let (name, config) = input.into();
332 Ok(self
333 .client
334 .ensure_stream(name, config)
335 .await?
336 .try_map(StreamInfo::try_from)?
337 .into())
338 }
339
340 pub async fn get_stream_config(&self, name: StreamName) -> Result<StreamConfig, S2Error> {
342 let config = self.client.get_stream_config(name).await?;
343 Ok(config.into())
344 }
345
346 pub async fn delete_stream(&self, input: DeleteStreamInput) -> Result<(), S2Error> {
348 Ok(self
349 .client
350 .delete_stream(input.name, input.ignore_not_found)
351 .await?)
352 }
353
354 pub async fn reconfigure_stream(
356 &self,
357 input: ReconfigureStreamInput,
358 ) -> Result<StreamConfig, S2Error> {
359 let config = self
360 .client
361 .reconfigure_stream(input.name, input.config.into())
362 .await?;
363 Ok(config.into())
364 }
365}
366
367#[derive(Debug, Clone)]
368pub struct S2Stream {
372 client: BasinClient,
373 name: StreamName,
374 encryption: Option<EncryptionKey>,
375}
376
377impl S2Stream {
378 pub fn with_encryption_key(self, encryption: EncryptionKey) -> Self {
380 Self {
381 encryption: Some(encryption),
382 ..self
383 }
384 }
385
386 pub async fn check_tail(&self) -> Result<StreamPosition, S2Error> {
388 let response = self.client.check_tail(&self.name).await?;
389 Ok(response.tail.into())
390 }
391
392 pub async fn append(&self, input: AppendInput) -> Result<AppendAck, S2Error> {
394 let ack = self
395 .client
396 .append(
397 &self.name,
398 input.into(),
399 self.encryption.as_ref(),
400 self.client.config.retry.append_retry_policy,
401 )
402 .await?;
403 Ok(ack.into())
404 }
405
406 pub async fn read(&self, input: ReadInput) -> Result<ReadBatch, S2Error> {
408 let batch = self
409 .client
410 .read(
411 &self.name,
412 input.start.into(),
413 input.stop.into(),
414 self.encryption.as_ref(),
415 )
416 .await?;
417 let mut batch = ReadBatch::from_api(batch);
418 if input.ignore_command_records {
419 batch.records.retain(|r| !r.is_command_record());
420 }
421 Ok(batch)
422 }
423
424 pub fn append_session(&self, config: AppendSessionConfig) -> AppendSession {
426 AppendSession::new(
427 self.client.clone(),
428 self.name.clone(),
429 self.encryption.clone(),
430 config,
431 )
432 }
433
434 pub fn producer(&self, config: ProducerConfig) -> Producer {
436 Producer::new(
437 self.client.clone(),
438 self.name.clone(),
439 self.encryption.clone(),
440 config,
441 )
442 }
443
444 pub async fn read_session(&self, input: ReadInput) -> Result<Streaming<ReadBatch>, S2Error> {
446 let batches = session::read_session(
447 self.client.clone(),
448 self.name.clone(),
449 self.encryption.clone(),
450 input.start.into(),
451 input.stop.into(),
452 input.ignore_command_records,
453 )
454 .await?;
455 Ok(Box::pin(batches.map(|res| match res {
456 Ok(batch) => Ok(batch),
457 Err(err) => Err(err.into()),
458 })))
459 }
460}