Skip to main content

s2_sdk/
ops.rs

1use futures::StreamExt;
2
3#[cfg(feature = "_hidden")]
4use crate::client::Connect;
5use crate::{
6    api::{AccountClient, BaseClient, BasinClient},
7    producer::{Producer, ProducerConfig},
8    session::{self, AppendSession, AppendSessionConfig},
9    types::{
10        AccessTokenId, AccessTokenInfo, AppendAck, AppendInput, BasinConfig, BasinInfo, BasinName,
11        CreateBasinInput, CreateStreamInput, DeleteBasinInput, DeleteStreamInput, EncryptionKey,
12        EnsureBasinInput, EnsureOutput, EnsureStreamInput, GetAccountMetricsInput,
13        GetBasinMetricsInput, GetStreamMetricsInput, IssueAccessTokenInput, ListAccessTokensInput,
14        ListAllAccessTokensInput, ListAllBasinsInput, ListAllStreamsInput, ListBasinsInput,
15        ListStreamsInput, Metric, Page, ReadBatch, ReadInput, ReconfigureBasinInput,
16        ReconfigureStreamInput, S2Config, S2Error, StreamConfig, StreamInfo, StreamName,
17        StreamPosition, Streaming,
18    },
19};
20
21#[derive(Debug, Clone)]
22/// An S2 account.
23pub struct S2 {
24    client: AccountClient,
25}
26
27impl S2 {
28    /// Create a new [`S2`].
29    pub fn new(config: S2Config) -> Result<Self, S2Error> {
30        let base_client = BaseClient::init(&config)?;
31        Ok(Self {
32            client: AccountClient::init(config, base_client),
33        })
34    }
35
36    #[doc(hidden)]
37    #[cfg(feature = "_hidden")]
38    pub fn new_with_connector<C>(config: S2Config, connector: C) -> Result<Self, S2Error>
39    where
40        C: Connect + Clone + Send + Sync + 'static,
41    {
42        let base_client = BaseClient::init_with_connector(&config, connector)?;
43        Ok(Self {
44            client: AccountClient::init(config, base_client),
45        })
46    }
47
48    /// Get an [`S2Basin`].
49    pub fn basin(&self, name: BasinName) -> S2Basin {
50        S2Basin {
51            client: self.client.basin_client(name),
52        }
53    }
54
55    /// List a page of basins.
56    ///
57    /// See [`list_all_basins`](crate::S2::list_all_basins) for automatic pagination.
58    pub async fn list_basins(&self, input: ListBasinsInput) -> Result<Page<BasinInfo>, S2Error> {
59        let response = self.client.list_basins(input.into()).await?;
60        Ok(Page::new(
61            response
62                .basins
63                .into_iter()
64                .map(TryInto::try_into)
65                .collect::<Result<Vec<_>, _>>()?,
66            response.has_more,
67        ))
68    }
69
70    /// List all basins, paginating automatically.
71    pub fn list_all_basins(&self, input: ListAllBasinsInput) -> Streaming<BasinInfo> {
72        let s2 = self.clone();
73        let prefix = input.prefix;
74        let start_after = input.start_after;
75        let include_deleted = input.include_deleted;
76        let mut input = ListBasinsInput::new()
77            .with_prefix(prefix)
78            .with_start_after(start_after);
79        Box::pin(async_stream::try_stream! {
80            loop {
81                let page = s2.list_basins(input.clone()).await?;
82                let start_after = page.values.last().map(|info| info.name.clone().into());
83
84                for info in page.values {
85                    if !include_deleted && info.deleted_at.is_some() {
86                        continue;
87                    }
88                    yield info;
89                }
90
91                if page.has_more && let Some(start_after) = start_after {
92                    input = input.with_start_after(start_after);
93                } else {
94                    break;
95                }
96            }
97        })
98    }
99
100    /// Create a basin.
101    pub async fn create_basin(&self, input: CreateBasinInput) -> Result<BasinInfo, S2Error> {
102        let (request, idempotency_token) = input.into();
103        let info = self.client.create_basin(request, idempotency_token).await?;
104        Ok(info.try_into()?)
105    }
106
107    /// Ensure a basin.
108    ///
109    /// If the basin doesn't exist, creates the basin with specified configuration.
110    ///
111    /// If the basin already exists:
112    /// - Its configuration is updated to the specified configuration, if different.
113    /// - Its configuration is unchanged, if the specified configuration is same.
114    pub async fn ensure_basin(
115        &self,
116        input: EnsureBasinInput,
117    ) -> Result<EnsureOutput<BasinInfo>, S2Error> {
118        let (name, request) = input.into();
119        Ok(self
120            .client
121            .ensure_basin(name, request)
122            .await?
123            .try_map(BasinInfo::try_from)?
124            .into())
125    }
126
127    /// Get basin configuration.
128    pub async fn get_basin_config(&self, name: BasinName) -> Result<BasinConfig, S2Error> {
129        let config = self.client.get_basin_config(name).await?;
130        Ok(config.into())
131    }
132
133    /// Delete a basin.
134    pub async fn delete_basin(&self, input: DeleteBasinInput) -> Result<(), S2Error> {
135        Ok(self
136            .client
137            .delete_basin(input.name, input.ignore_not_found)
138            .await?)
139    }
140
141    /// Reconfigure a basin.
142    pub async fn reconfigure_basin(
143        &self,
144        input: ReconfigureBasinInput,
145    ) -> Result<BasinConfig, S2Error> {
146        let config = self
147            .client
148            .reconfigure_basin(input.name, input.config.into())
149            .await?;
150        Ok(config.into())
151    }
152
153    /// List a page of access tokens.
154    ///
155    /// See [`list_all_access_tokens`](crate::S2::list_all_access_tokens) for automatic pagination.
156    pub async fn list_access_tokens(
157        &self,
158        input: ListAccessTokensInput,
159    ) -> Result<Page<AccessTokenInfo>, S2Error> {
160        let response = self.client.list_access_tokens(input.into()).await?;
161        Ok(Page::new(
162            response
163                .access_tokens
164                .into_iter()
165                .map(TryInto::try_into)
166                .collect::<Result<Vec<_>, _>>()?,
167            response.has_more,
168        ))
169    }
170
171    /// List all access tokens, paginating automatically.
172    pub fn list_all_access_tokens(
173        &self,
174        input: ListAllAccessTokensInput,
175    ) -> Streaming<AccessTokenInfo> {
176        let s2 = self.clone();
177        let prefix = input.prefix;
178        let start_after = input.start_after;
179        let mut input = ListAccessTokensInput::new()
180            .with_prefix(prefix)
181            .with_start_after(start_after);
182        Box::pin(async_stream::try_stream! {
183            loop {
184                let page = s2.list_access_tokens(input.clone()).await?;
185
186                let start_after = page.values.last().map(|info| info.id.clone().into());
187                for info in page.values {
188                    yield info;
189                }
190
191                if page.has_more && let Some(start_after) = start_after {
192                    input = input.with_start_after(start_after);
193                } else {
194                    break;
195                }
196            }
197        })
198    }
199
200    /// Issue an access token.
201    pub async fn issue_access_token(
202        &self,
203        input: IssueAccessTokenInput,
204    ) -> Result<String, S2Error> {
205        let response = self.client.issue_access_token(input.into()).await?;
206        Ok(response.access_token)
207    }
208
209    /// Revoke an access token.
210    pub async fn revoke_access_token(&self, id: AccessTokenId) -> Result<(), S2Error> {
211        Ok(self.client.revoke_access_token(id).await?)
212    }
213
214    /// Get account metrics.
215    pub async fn get_account_metrics(
216        &self,
217        input: GetAccountMetricsInput,
218    ) -> Result<Vec<Metric>, S2Error> {
219        let response = self.client.get_account_metrics(input.into()).await?;
220        Ok(response.values.into_iter().map(Into::into).collect())
221    }
222
223    /// Get basin metrics.
224    pub async fn get_basin_metrics(
225        &self,
226        input: GetBasinMetricsInput,
227    ) -> Result<Vec<Metric>, S2Error> {
228        let (name, request) = input.into();
229        let response = self.client.get_basin_metrics(name, request).await?;
230        Ok(response.values.into_iter().map(Into::into).collect())
231    }
232
233    /// Get stream metrics.
234    pub async fn get_stream_metrics(
235        &self,
236        input: GetStreamMetricsInput,
237    ) -> Result<Vec<Metric>, S2Error> {
238        let (basin_name, stream_name, request) = input.into();
239        let response = self
240            .client
241            .get_stream_metrics(basin_name, stream_name, request)
242            .await?;
243        Ok(response.values.into_iter().map(Into::into).collect())
244    }
245}
246
247#[derive(Debug, Clone)]
248/// A basin in an S2 account.
249///
250/// See [`S2::basin`].
251pub struct S2Basin {
252    client: BasinClient,
253}
254
255impl S2Basin {
256    /// Get an [`S2Stream`].
257    pub fn stream(&self, name: StreamName) -> S2Stream {
258        S2Stream {
259            client: self.client.clone(),
260            name,
261            encryption: None,
262        }
263    }
264
265    /// List a page of streams.
266    ///
267    /// See [`list_all_streams`](crate::S2Basin::list_all_streams) for automatic pagination.
268    pub async fn list_streams(&self, input: ListStreamsInput) -> Result<Page<StreamInfo>, S2Error> {
269        let response = self.client.list_streams(input.into()).await?;
270        Ok(Page::new(
271            response
272                .streams
273                .into_iter()
274                .map(TryInto::try_into)
275                .collect::<Result<Vec<_>, _>>()?,
276            response.has_more,
277        ))
278    }
279
280    /// List all streams, paginating automatically.
281    pub fn list_all_streams(&self, input: ListAllStreamsInput) -> Streaming<StreamInfo> {
282        let basin = self.clone();
283        let prefix = input.prefix;
284        let start_after = input.start_after;
285        let include_deleted = input.include_deleted;
286        let mut input = ListStreamsInput::new()
287            .with_prefix(prefix)
288            .with_start_after(start_after);
289        Box::pin(async_stream::try_stream! {
290            loop {
291                let page = basin.list_streams(input.clone()).await?;
292                let start_after = page.values.last().map(|info| info.name.clone().into());
293
294                for info in page.values {
295                    if !include_deleted && info.deleted_at.is_some() {
296                        continue;
297                    }
298                    yield info;
299                }
300
301                if page.has_more && let Some(start_after) = start_after {
302                    input = input.with_start_after(start_after);
303                } else {
304                    break;
305                }
306            }
307        })
308    }
309
310    /// Create a stream.
311    pub async fn create_stream(&self, input: CreateStreamInput) -> Result<StreamInfo, S2Error> {
312        let (request, idempotency_token) = input.into();
313        let info = self
314            .client
315            .create_stream(request, idempotency_token)
316            .await?;
317        Ok(info.try_into()?)
318    }
319
320    /// Ensure a stream.
321    ///
322    /// If the stream doesn't exist, creates the stream with specified configuration.
323    ///
324    /// If the stream already exists:
325    /// - Its configuration is updated to the specified configuration, if different.
326    /// - Its configuration is unchanged, if the specified configuration is same.
327    pub async fn ensure_stream(
328        &self,
329        input: EnsureStreamInput,
330    ) -> Result<EnsureOutput<StreamInfo>, S2Error> {
331        let (name, config) = input.into();
332        Ok(self
333            .client
334            .ensure_stream(name, config)
335            .await?
336            .try_map(StreamInfo::try_from)?
337            .into())
338    }
339
340    /// Get stream configuration.
341    pub async fn get_stream_config(&self, name: StreamName) -> Result<StreamConfig, S2Error> {
342        let config = self.client.get_stream_config(name).await?;
343        Ok(config.into())
344    }
345
346    /// Delete a stream.
347    pub async fn delete_stream(&self, input: DeleteStreamInput) -> Result<(), S2Error> {
348        Ok(self
349            .client
350            .delete_stream(input.name, input.ignore_not_found)
351            .await?)
352    }
353
354    /// Reconfigure a stream.
355    pub async fn reconfigure_stream(
356        &self,
357        input: ReconfigureStreamInput,
358    ) -> Result<StreamConfig, S2Error> {
359        let config = self
360            .client
361            .reconfigure_stream(input.name, input.config.into())
362            .await?;
363        Ok(config.into())
364    }
365}
366
367#[derive(Debug, Clone)]
368/// A stream in an S2 basin.
369///
370/// See [`S2Basin::stream`].
371pub struct S2Stream {
372    client: BasinClient,
373    name: StreamName,
374    encryption: Option<EncryptionKey>,
375}
376
377impl S2Stream {
378    /// Set the encryption key for this stream handle.
379    pub fn with_encryption_key(self, encryption: EncryptionKey) -> Self {
380        Self {
381            encryption: Some(encryption),
382            ..self
383        }
384    }
385
386    /// Check tail position.
387    pub async fn check_tail(&self) -> Result<StreamPosition, S2Error> {
388        let response = self.client.check_tail(&self.name).await?;
389        Ok(response.tail.into())
390    }
391
392    /// Append records.
393    pub async fn append(&self, input: AppendInput) -> Result<AppendAck, S2Error> {
394        let ack = self
395            .client
396            .append(
397                &self.name,
398                input.into(),
399                self.encryption.as_ref(),
400                self.client.config.retry.append_retry_policy,
401            )
402            .await?;
403        Ok(ack.into())
404    }
405
406    /// Read records.
407    pub async fn read(&self, input: ReadInput) -> Result<ReadBatch, S2Error> {
408        let batch = self
409            .client
410            .read(
411                &self.name,
412                input.start.into(),
413                input.stop.into(),
414                self.encryption.as_ref(),
415            )
416            .await?;
417        let mut batch = ReadBatch::from_api(batch);
418        if input.ignore_command_records {
419            batch.records.retain(|r| !r.is_command_record());
420        }
421        Ok(batch)
422    }
423
424    /// Create an append session for submitting [`AppendInput`]s.
425    pub fn append_session(&self, config: AppendSessionConfig) -> AppendSession {
426        AppendSession::new(
427            self.client.clone(),
428            self.name.clone(),
429            self.encryption.clone(),
430            config,
431        )
432    }
433
434    /// Create a producer for submitting individual [`AppendRecord`](crate::types::AppendRecord)s.
435    pub fn producer(&self, config: ProducerConfig) -> Producer {
436        Producer::new(
437            self.client.clone(),
438            self.name.clone(),
439            self.encryption.clone(),
440            config,
441        )
442    }
443
444    /// Create a read session.
445    pub async fn read_session(&self, input: ReadInput) -> Result<Streaming<ReadBatch>, S2Error> {
446        let batches = session::read_session(
447            self.client.clone(),
448            self.name.clone(),
449            self.encryption.clone(),
450            input.start.into(),
451            input.stop.into(),
452            input.ignore_command_records,
453        )
454        .await?;
455        Ok(Box::pin(batches.map(|res| match res {
456            Ok(batch) => Ok(batch),
457            Err(err) => Err(err.into()),
458        })))
459    }
460}