Skip to main content

s2_sdk/
ops.rs

1use futures::StreamExt;
2
3#[cfg(feature = "_hidden")]
4use crate::client::Connect;
5use crate::{
6    api::{AccountClient, BaseClient, BasinClient},
7    producer::{Producer, ProducerConfig},
8    session::{self, AppendSession, AppendSessionConfig},
9    types::{
10        AccessTokenId, AccessTokenInfo, AppendAck, AppendInput, BasinConfig, BasinInfo, BasinName,
11        CreateBasinInput, CreateStreamInput, DeleteBasinInput, DeleteStreamInput, EncryptionKey,
12        EnsureBasinInput, EnsureOutput, EnsureStreamInput, GetAccountMetricsInput,
13        GetBasinMetricsInput, GetStreamMetricsInput, IssueAccessTokenInput, ListAccessTokensInput,
14        ListAllAccessTokensInput, ListAllBasinsInput, ListAllStreamsInput, ListBasinsInput,
15        ListStreamsInput, LocationInfo, LocationName, Metric, Page, ReadBatch, ReadInput,
16        ReconfigureBasinInput, ReconfigureStreamInput, S2Config, S2Error, StreamConfig, StreamInfo,
17        StreamName, StreamPosition, Streaming,
18    },
19};
20
21#[derive(Debug, Clone)]
22/// An S2 account.
23pub struct S2 {
24    client: AccountClient,
25}
26
27impl S2 {
28    /// Create a new [`S2`].
29    pub fn new(config: S2Config) -> Result<Self, S2Error> {
30        let base_client = BaseClient::init(&config)?;
31        Ok(Self {
32            client: AccountClient::init(config, base_client),
33        })
34    }
35
36    #[doc(hidden)]
37    #[cfg(feature = "_hidden")]
38    pub fn new_with_connector<C>(config: S2Config, connector: C) -> Result<Self, S2Error>
39    where
40        C: Connect + Clone + Send + Sync + 'static,
41    {
42        let base_client = BaseClient::init_with_connector(&config, connector)?;
43        Ok(Self {
44            client: AccountClient::init(config, base_client),
45        })
46    }
47
48    /// Get an [`S2Basin`].
49    pub fn basin(&self, name: BasinName) -> S2Basin {
50        S2Basin {
51            client: self.client.basin_client(name),
52        }
53    }
54
55    /// List a page of basins.
56    ///
57    /// See [`list_all_basins`](crate::S2::list_all_basins) for automatic pagination.
58    pub async fn list_basins(&self, input: ListBasinsInput) -> Result<Page<BasinInfo>, S2Error> {
59        let response = self.client.list_basins(input.into()).await?;
60        Ok(Page::new(
61            response
62                .basins
63                .into_iter()
64                .map(TryInto::try_into)
65                .collect::<Result<Vec<_>, _>>()?,
66            response.has_more,
67        ))
68    }
69
70    /// List all basins, paginating automatically.
71    pub fn list_all_basins(&self, input: ListAllBasinsInput) -> Streaming<BasinInfo> {
72        let s2 = self.clone();
73        let prefix = input.prefix;
74        let start_after = input.start_after;
75        let include_deleted = input.include_deleted;
76        let mut input = ListBasinsInput::new()
77            .with_prefix(prefix)
78            .with_start_after(start_after);
79        Box::pin(async_stream::try_stream! {
80            loop {
81                let page = s2.list_basins(input.clone()).await?;
82                let start_after = page.values.last().map(|info| info.name.clone().into());
83
84                for info in page.values {
85                    if !include_deleted && info.deleted_at.is_some() {
86                        continue;
87                    }
88                    yield info;
89                }
90
91                if page.has_more && let Some(start_after) = start_after {
92                    input = input.with_start_after(start_after);
93                } else {
94                    break;
95                }
96            }
97        })
98    }
99
100    /// Create a basin.
101    pub async fn create_basin(&self, input: CreateBasinInput) -> Result<BasinInfo, S2Error> {
102        let (request, idempotency_token) = input.into();
103        let info = self.client.create_basin(request, idempotency_token).await?;
104        Ok(info.try_into()?)
105    }
106
107    /// Ensure a basin.
108    ///
109    /// If the basin doesn't exist, creates the basin with specified configuration.
110    ///
111    /// If the basin already exists:
112    /// - Its configuration is updated to the specified configuration, if different.
113    /// - Its configuration is unchanged, if the specified configuration is same.
114    pub async fn ensure_basin(
115        &self,
116        input: EnsureBasinInput,
117    ) -> Result<EnsureOutput<BasinInfo>, S2Error> {
118        let (name, request) = input.into();
119        Ok(self
120            .client
121            .ensure_basin(name, request)
122            .await?
123            .try_map(BasinInfo::try_from)?
124            .into())
125    }
126
127    /// Get basin configuration.
128    pub async fn get_basin_config(&self, name: BasinName) -> Result<BasinConfig, S2Error> {
129        let config = self.client.get_basin_config(name).await?;
130        Ok(config.into())
131    }
132
133    /// Delete a basin.
134    pub async fn delete_basin(&self, input: DeleteBasinInput) -> Result<(), S2Error> {
135        Ok(self
136            .client
137            .delete_basin(input.name, input.ignore_not_found)
138            .await?)
139    }
140
141    /// Reconfigure a basin.
142    pub async fn reconfigure_basin(
143        &self,
144        input: ReconfigureBasinInput,
145    ) -> Result<BasinConfig, S2Error> {
146        let config = self
147            .client
148            .reconfigure_basin(input.name, input.config.into())
149            .await?;
150        Ok(config.into())
151    }
152
153    /// List a page of access tokens.
154    ///
155    /// See [`list_all_access_tokens`](crate::S2::list_all_access_tokens) for automatic pagination.
156    pub async fn list_access_tokens(
157        &self,
158        input: ListAccessTokensInput,
159    ) -> Result<Page<AccessTokenInfo>, S2Error> {
160        let response = self.client.list_access_tokens(input.into()).await?;
161        Ok(Page::new(
162            response
163                .access_tokens
164                .into_iter()
165                .map(TryInto::try_into)
166                .collect::<Result<Vec<_>, _>>()?,
167            response.has_more,
168        ))
169    }
170
171    /// List all access tokens, paginating automatically.
172    pub fn list_all_access_tokens(
173        &self,
174        input: ListAllAccessTokensInput,
175    ) -> Streaming<AccessTokenInfo> {
176        let s2 = self.clone();
177        let prefix = input.prefix;
178        let start_after = input.start_after;
179        let mut input = ListAccessTokensInput::new()
180            .with_prefix(prefix)
181            .with_start_after(start_after);
182        Box::pin(async_stream::try_stream! {
183            loop {
184                let page = s2.list_access_tokens(input.clone()).await?;
185
186                let start_after = page.values.last().map(|info| info.id.clone().into());
187                for info in page.values {
188                    yield info;
189                }
190
191                if page.has_more && let Some(start_after) = start_after {
192                    input = input.with_start_after(start_after);
193                } else {
194                    break;
195                }
196            }
197        })
198    }
199
200    /// Issue an access token.
201    pub async fn issue_access_token(
202        &self,
203        input: IssueAccessTokenInput,
204    ) -> Result<String, S2Error> {
205        let response = self.client.issue_access_token(input.into()).await?;
206        Ok(response.access_token)
207    }
208
209    /// Revoke an access token.
210    pub async fn revoke_access_token(&self, id: AccessTokenId) -> Result<(), S2Error> {
211        Ok(self.client.revoke_access_token(id).await?)
212    }
213
214    /// List locations.
215    pub async fn list_locations(&self) -> Result<Vec<LocationInfo>, S2Error> {
216        let response = self.client.list_locations().await?;
217        Ok(response.into_iter().map(Into::into).collect())
218    }
219
220    /// Get the default location.
221    pub async fn get_default_location(&self) -> Result<LocationInfo, S2Error> {
222        Ok(self.client.get_default_location().await?.into())
223    }
224
225    /// Set the default location.
226    pub async fn set_default_location(
227        &self,
228        location: LocationName,
229    ) -> Result<LocationInfo, S2Error> {
230        Ok(self.client.set_default_location(location).await?.into())
231    }
232
233    /// Get account metrics.
234    pub async fn get_account_metrics(
235        &self,
236        input: GetAccountMetricsInput,
237    ) -> Result<Vec<Metric>, S2Error> {
238        let response = self.client.get_account_metrics(input.into()).await?;
239        Ok(response.values.into_iter().map(Into::into).collect())
240    }
241
242    /// Get basin metrics.
243    pub async fn get_basin_metrics(
244        &self,
245        input: GetBasinMetricsInput,
246    ) -> Result<Vec<Metric>, S2Error> {
247        let (name, request) = input.into();
248        let response = self.client.get_basin_metrics(name, request).await?;
249        Ok(response.values.into_iter().map(Into::into).collect())
250    }
251
252    /// Get stream metrics.
253    pub async fn get_stream_metrics(
254        &self,
255        input: GetStreamMetricsInput,
256    ) -> Result<Vec<Metric>, S2Error> {
257        let (basin_name, stream_name, request) = input.into();
258        let response = self
259            .client
260            .get_stream_metrics(basin_name, stream_name, request)
261            .await?;
262        Ok(response.values.into_iter().map(Into::into).collect())
263    }
264}
265
266#[derive(Debug, Clone)]
267/// A basin in an S2 account.
268///
269/// See [`S2::basin`].
270pub struct S2Basin {
271    client: BasinClient,
272}
273
274impl S2Basin {
275    /// Get an [`S2Stream`].
276    pub fn stream(&self, name: StreamName) -> S2Stream {
277        S2Stream {
278            client: self.client.clone(),
279            name,
280            encryption: None,
281        }
282    }
283
284    /// List a page of streams.
285    ///
286    /// See [`list_all_streams`](crate::S2Basin::list_all_streams) for automatic pagination.
287    pub async fn list_streams(&self, input: ListStreamsInput) -> Result<Page<StreamInfo>, S2Error> {
288        let response = self.client.list_streams(input.into()).await?;
289        Ok(Page::new(
290            response
291                .streams
292                .into_iter()
293                .map(TryInto::try_into)
294                .collect::<Result<Vec<_>, _>>()?,
295            response.has_more,
296        ))
297    }
298
299    /// List all streams, paginating automatically.
300    pub fn list_all_streams(&self, input: ListAllStreamsInput) -> Streaming<StreamInfo> {
301        let basin = self.clone();
302        let prefix = input.prefix;
303        let start_after = input.start_after;
304        let include_deleted = input.include_deleted;
305        let mut input = ListStreamsInput::new()
306            .with_prefix(prefix)
307            .with_start_after(start_after);
308        Box::pin(async_stream::try_stream! {
309            loop {
310                let page = basin.list_streams(input.clone()).await?;
311                let start_after = page.values.last().map(|info| info.name.clone().into());
312
313                for info in page.values {
314                    if !include_deleted && info.deleted_at.is_some() {
315                        continue;
316                    }
317                    yield info;
318                }
319
320                if page.has_more && let Some(start_after) = start_after {
321                    input = input.with_start_after(start_after);
322                } else {
323                    break;
324                }
325            }
326        })
327    }
328
329    /// Create a stream.
330    pub async fn create_stream(&self, input: CreateStreamInput) -> Result<StreamInfo, S2Error> {
331        let (request, idempotency_token) = input.into();
332        let info = self
333            .client
334            .create_stream(request, idempotency_token)
335            .await?;
336        Ok(info.try_into()?)
337    }
338
339    /// Ensure a stream.
340    ///
341    /// If the stream doesn't exist, creates the stream with specified configuration.
342    ///
343    /// If the stream already exists:
344    /// - Its configuration is updated to the specified configuration, if different.
345    /// - Its configuration is unchanged, if the specified configuration is same.
346    pub async fn ensure_stream(
347        &self,
348        input: EnsureStreamInput,
349    ) -> Result<EnsureOutput<StreamInfo>, S2Error> {
350        let (name, config) = input.into();
351        Ok(self
352            .client
353            .ensure_stream(name, config)
354            .await?
355            .try_map(StreamInfo::try_from)?
356            .into())
357    }
358
359    /// Get stream configuration.
360    pub async fn get_stream_config(&self, name: StreamName) -> Result<StreamConfig, S2Error> {
361        let config = self.client.get_stream_config(name).await?;
362        Ok(config.into())
363    }
364
365    /// Delete a stream.
366    pub async fn delete_stream(&self, input: DeleteStreamInput) -> Result<(), S2Error> {
367        Ok(self
368            .client
369            .delete_stream(input.name, input.ignore_not_found)
370            .await?)
371    }
372
373    /// Reconfigure a stream.
374    pub async fn reconfigure_stream(
375        &self,
376        input: ReconfigureStreamInput,
377    ) -> Result<StreamConfig, S2Error> {
378        let config = self
379            .client
380            .reconfigure_stream(input.name, input.config.into())
381            .await?;
382        Ok(config.into())
383    }
384}
385
386#[derive(Debug, Clone)]
387/// A stream in an S2 basin.
388///
389/// See [`S2Basin::stream`].
390pub struct S2Stream {
391    client: BasinClient,
392    name: StreamName,
393    encryption: Option<EncryptionKey>,
394}
395
396impl S2Stream {
397    /// Set the encryption key for this stream handle.
398    pub fn with_encryption_key(self, encryption: EncryptionKey) -> Self {
399        Self {
400            encryption: Some(encryption),
401            ..self
402        }
403    }
404
405    /// Check tail position.
406    pub async fn check_tail(&self) -> Result<StreamPosition, S2Error> {
407        let response = self.client.check_tail(&self.name).await?;
408        Ok(response.tail.into())
409    }
410
411    /// Append records.
412    pub async fn append(&self, input: AppendInput) -> Result<AppendAck, S2Error> {
413        let ack = self
414            .client
415            .append(
416                &self.name,
417                input.into(),
418                self.encryption.as_ref(),
419                self.client.config.retry.append_retry_policy,
420            )
421            .await?;
422        Ok(ack.into())
423    }
424
425    /// Read records.
426    pub async fn read(&self, input: ReadInput) -> Result<ReadBatch, S2Error> {
427        let batch = self
428            .client
429            .read(
430                &self.name,
431                input.start.into(),
432                input.stop.into(),
433                self.encryption.as_ref(),
434            )
435            .await?;
436        let mut batch = ReadBatch::from_api(batch);
437        if input.ignore_command_records {
438            batch.records.retain(|r| !r.is_command_record());
439        }
440        Ok(batch)
441    }
442
443    /// Create an append session for submitting [`AppendInput`]s.
444    pub fn append_session(&self, config: AppendSessionConfig) -> AppendSession {
445        AppendSession::new(
446            self.client.clone(),
447            self.name.clone(),
448            self.encryption.clone(),
449            config,
450        )
451    }
452
453    /// Create a producer for submitting individual [`AppendRecord`](crate::types::AppendRecord)s.
454    pub fn producer(&self, config: ProducerConfig) -> Producer {
455        Producer::new(
456            self.client.clone(),
457            self.name.clone(),
458            self.encryption.clone(),
459            config,
460        )
461    }
462
463    /// Create a read session.
464    pub async fn read_session(&self, input: ReadInput) -> Result<Streaming<ReadBatch>, S2Error> {
465        let batches = session::read_session(
466            self.client.clone(),
467            self.name.clone(),
468            self.encryption.clone(),
469            input.start.into(),
470            input.stop.into(),
471            input.ignore_command_records,
472        )
473        .await?;
474        Ok(Box::pin(batches.map(|res| match res {
475            Ok(batch) => Ok(batch),
476            Err(err) => Err(err.into()),
477        })))
478    }
479}