s2_sdk/
ops.rs

1use futures::StreamExt;
2
3use crate::{
4    api::{AccountClient, BasinClient},
5    producer::{Producer, ProducerConfig},
6    session::{self, AppendSession, AppendSessionConfig},
7    types::{
8        AccessTokenId, AccessTokenInfo, AppendAck, AppendInput, BasinConfig, BasinInfo, BasinName,
9        BasinState, CreateBasinInput, CreateStreamInput, DeleteBasinInput, DeleteStreamInput,
10        GetAccountMetricsInput, GetBasinMetricsInput, GetStreamMetricsInput, IssueAccessTokenInput,
11        ListAccessTokensInput, ListAllAccessTokensInput, ListAllBasinsInput, ListAllStreamsInput,
12        ListBasinsInput, ListStreamsInput, Metric, Page, ReadBatch, ReadInput,
13        ReconfigureBasinInput, ReconfigureStreamInput, S2Config, S2Error, StreamConfig, StreamInfo,
14        StreamName, StreamPosition, Streaming,
15    },
16};
17
18#[derive(Debug, Clone)]
19/// An S2 account.
20pub struct S2 {
21    client: AccountClient,
22}
23
24impl S2 {
25    /// Create a new [`S2`].
26    pub fn new(config: S2Config) -> Result<Self, S2Error> {
27        Ok(Self {
28            client: AccountClient::init(config)?,
29        })
30    }
31
32    /// Get an [`S2Basin`].
33    pub fn basin(&self, name: BasinName) -> S2Basin {
34        S2Basin {
35            client: self.client.basin_client(name),
36        }
37    }
38
39    /// List a page of basins.
40    ///
41    /// See [`list_all_basins`](crate::S2::list_all_basins) for automatic pagination.
42    pub async fn list_basins(&self, input: ListBasinsInput) -> Result<Page<BasinInfo>, S2Error> {
43        let response = self.client.list_basins(input.into()).await?;
44        Ok(Page::new(
45            response
46                .basins
47                .into_iter()
48                .map(Into::into)
49                .collect::<Vec<_>>(),
50            response.has_more,
51        ))
52    }
53
54    /// List all basins, paginating automatically.
55    pub fn list_all_basins(&self, input: ListAllBasinsInput) -> Streaming<BasinInfo> {
56        let s2 = self.clone();
57        let prefix = input.prefix;
58        let start_after = input.start_after;
59        let include_deleted = input.include_deleted;
60        let mut input = ListBasinsInput::new()
61            .with_prefix(prefix)
62            .with_start_after(start_after);
63        Box::pin(async_stream::try_stream! {
64            loop {
65                let page = s2.list_basins(input.clone()).await?;
66
67                let start_after = page.values.last().map(|info| info.name.clone().into());
68                for info in page.values {
69                    if !include_deleted && info.state == BasinState::Deleting {
70                        continue;
71                    }
72                    yield info;
73                }
74
75                if page.has_more && let Some(start_after) = start_after {
76                    input = input.with_start_after(start_after);
77                } else {
78                    break;
79                }
80            }
81        })
82    }
83
84    /// Create a basin.
85    pub async fn create_basin(&self, input: CreateBasinInput) -> Result<BasinInfo, S2Error> {
86        let (request, idempotency_token) = input.into();
87        let info = self.client.create_basin(request, idempotency_token).await?;
88        Ok(info.into())
89    }
90
91    /// Get basin configuration.
92    pub async fn get_basin_config(&self, name: BasinName) -> Result<BasinConfig, S2Error> {
93        let config = self.client.get_basin_config(name).await?;
94        Ok(config.into())
95    }
96
97    /// Delete a basin.
98    pub async fn delete_basin(&self, input: DeleteBasinInput) -> Result<(), S2Error> {
99        Ok(self
100            .client
101            .delete_basin(input.name, input.ignore_not_found)
102            .await?)
103    }
104
105    /// Reconfigure a basin.
106    pub async fn reconfigure_basin(
107        &self,
108        input: ReconfigureBasinInput,
109    ) -> Result<BasinConfig, S2Error> {
110        let config = self
111            .client
112            .reconfigure_basin(input.name, input.config.into())
113            .await?;
114        Ok(config.into())
115    }
116
117    /// List a page of access tokens.
118    ///
119    /// See [`list_all_access_tokens`](crate::S2::list_all_access_tokens) for automatic pagination.
120    pub async fn list_access_tokens(
121        &self,
122        input: ListAccessTokensInput,
123    ) -> Result<Page<AccessTokenInfo>, S2Error> {
124        let response = self.client.list_access_tokens(input.into()).await?;
125        Ok(Page::new(
126            response
127                .access_tokens
128                .into_iter()
129                .map(TryInto::try_into)
130                .collect::<Result<Vec<_>, _>>()?,
131            response.has_more,
132        ))
133    }
134
135    /// List all access tokens, paginating automatically.
136    pub fn list_all_access_tokens(
137        &self,
138        input: ListAllAccessTokensInput,
139    ) -> Streaming<AccessTokenInfo> {
140        let s2 = self.clone();
141        let prefix = input.prefix;
142        let start_after = input.start_after;
143        let mut input = ListAccessTokensInput::new()
144            .with_prefix(prefix)
145            .with_start_after(start_after);
146        Box::pin(async_stream::try_stream! {
147            loop {
148                let page = s2.list_access_tokens(input.clone()).await?;
149
150                let start_after = page.values.last().map(|info| info.id.clone().into());
151                for info in page.values {
152                    yield info;
153                }
154
155                if page.has_more && let Some(start_after) = start_after {
156                    input = input.with_start_after(start_after);
157                } else {
158                    break;
159                }
160            }
161        })
162    }
163
164    /// Issue an access token.
165    pub async fn issue_access_token(
166        &self,
167        input: IssueAccessTokenInput,
168    ) -> Result<String, S2Error> {
169        let response = self.client.issue_access_token(input.into()).await?;
170        Ok(response.access_token)
171    }
172
173    /// Revoke an access token.
174    pub async fn revoke_access_token(&self, id: AccessTokenId) -> Result<(), S2Error> {
175        Ok(self.client.revoke_access_token(id).await?)
176    }
177
178    /// Get account metrics.
179    pub async fn get_account_metrics(
180        &self,
181        input: GetAccountMetricsInput,
182    ) -> Result<Vec<Metric>, S2Error> {
183        let response = self.client.get_account_metrics(input.into()).await?;
184        Ok(response.values.into_iter().map(Into::into).collect())
185    }
186
187    /// Get basin metrics.
188    pub async fn get_basin_metrics(
189        &self,
190        input: GetBasinMetricsInput,
191    ) -> Result<Vec<Metric>, S2Error> {
192        let (name, request) = input.into();
193        let response = self.client.get_basin_metrics(name, request).await?;
194        Ok(response.values.into_iter().map(Into::into).collect())
195    }
196
197    /// Get stream metrics.
198    pub async fn get_stream_metrics(
199        &self,
200        input: GetStreamMetricsInput,
201    ) -> Result<Vec<Metric>, S2Error> {
202        let (basin_name, stream_name, request) = input.into();
203        let response = self
204            .client
205            .get_stream_metrics(basin_name, stream_name, request)
206            .await?;
207        Ok(response.values.into_iter().map(Into::into).collect())
208    }
209}
210
211#[derive(Debug, Clone)]
212/// A basin in an S2 account.
213///
214/// See [`S2::basin`].
215pub struct S2Basin {
216    client: BasinClient,
217}
218
219impl S2Basin {
220    /// Get an [`S2Stream`].
221    pub fn stream(&self, name: StreamName) -> S2Stream {
222        S2Stream {
223            client: self.client.clone(),
224            name,
225        }
226    }
227
228    /// List a page of streams.
229    ///
230    /// See [`list_all_streams`](crate::S2Basin::list_all_streams) for automatic pagination.
231    pub async fn list_streams(&self, input: ListStreamsInput) -> Result<Page<StreamInfo>, S2Error> {
232        let response = self.client.list_streams(input.into()).await?;
233        Ok(Page::new(
234            response
235                .streams
236                .into_iter()
237                .map(Into::into)
238                .collect::<Vec<_>>(),
239            response.has_more,
240        ))
241    }
242
243    /// List all streams, paginating automatically.
244    pub fn list_all_streams(&self, input: ListAllStreamsInput) -> Streaming<StreamInfo> {
245        let basin = self.clone();
246        let prefix = input.prefix;
247        let start_after = input.start_after;
248        let include_deleted = input.include_deleted;
249        let mut input = ListStreamsInput::new()
250            .with_prefix(prefix)
251            .with_start_after(start_after);
252        Box::pin(async_stream::try_stream! {
253            loop {
254                let page = basin.list_streams(input.clone()).await?;
255
256                let start_after = page.values.last().map(|info| info.name.clone().into());
257                for info in page.values {
258                    if !include_deleted && info.deleted_at.is_some() {
259                        continue;
260                    }
261                    yield info;
262                }
263
264                if page.has_more && let Some(start_after) = start_after {
265                    input = input.with_start_after(start_after);
266                } else {
267                    break;
268                }
269            }
270        })
271    }
272
273    /// Create a stream.
274    pub async fn create_stream(&self, input: CreateStreamInput) -> Result<StreamInfo, S2Error> {
275        let (request, idempotency_token) = input.into();
276        let info = self
277            .client
278            .create_stream(request, idempotency_token)
279            .await?;
280        Ok(info.into())
281    }
282
283    /// Get stream configuration.
284    pub async fn get_stream_config(&self, name: StreamName) -> Result<StreamConfig, S2Error> {
285        let config = self.client.get_stream_config(name).await?;
286        Ok(config.into())
287    }
288
289    /// Delete a stream.
290    pub async fn delete_stream(&self, input: DeleteStreamInput) -> Result<(), S2Error> {
291        Ok(self
292            .client
293            .delete_stream(input.name, input.ignore_not_found)
294            .await?)
295    }
296
297    /// Reconfigure a stream.
298    pub async fn reconfigure_stream(
299        &self,
300        input: ReconfigureStreamInput,
301    ) -> Result<StreamConfig, S2Error> {
302        let config = self
303            .client
304            .reconfigure_stream(input.name, input.config.into())
305            .await?;
306        Ok(config.into())
307    }
308}
309
310#[derive(Debug, Clone)]
311/// A stream in an S2 basin.
312///
313/// See [`S2Basin::stream`].
314pub struct S2Stream {
315    client: BasinClient,
316    name: StreamName,
317}
318
319impl S2Stream {
320    /// Check tail position.
321    pub async fn check_tail(&self) -> Result<StreamPosition, S2Error> {
322        let response = self.client.check_tail(&self.name).await?;
323        Ok(response.tail.into())
324    }
325
326    /// Append records.
327    pub async fn append(&self, input: AppendInput) -> Result<AppendAck, S2Error> {
328        let retry_enabled = self
329            .client
330            .config
331            .retry
332            .append_retry_policy
333            .is_compliant(&input);
334        let ack = self
335            .client
336            .append(&self.name, input.into(), retry_enabled)
337            .await?;
338        Ok(ack.into())
339    }
340
341    /// Read records.
342    pub async fn read(&self, input: ReadInput) -> Result<ReadBatch, S2Error> {
343        let batch = self
344            .client
345            .read(&self.name, input.start.into(), input.stop.into())
346            .await?;
347        Ok(ReadBatch::from_api(batch, input.ignore_command_records))
348    }
349
350    /// Create an append session for submitting [`AppendInput`]s.
351    pub fn append_session(&self, config: AppendSessionConfig) -> AppendSession {
352        AppendSession::new(self.client.clone(), self.name.clone(), config)
353    }
354
355    /// Create a producer for submitting individual [`AppendRecord`](crate::types::AppendRecord)s.
356    pub fn producer(&self, config: ProducerConfig) -> Producer {
357        Producer::new(self.client.clone(), self.name.clone(), config)
358    }
359
360    /// Create a read session.
361    pub async fn read_session(&self, input: ReadInput) -> Result<Streaming<ReadBatch>, S2Error> {
362        let batches = session::read_session(
363            self.client.clone(),
364            self.name.clone(),
365            input.start.into(),
366            input.stop.into(),
367            input.ignore_command_records,
368        )
369        .await?;
370        Ok(Box::pin(batches.map(|res| match res {
371            Ok(batch) => Ok(batch),
372            Err(err) => Err(err.into()),
373        })))
374    }
375}