Skip to main content

s2_sdk/
ops.rs

1use futures::StreamExt;
2
3#[cfg(feature = "_hidden")]
4use crate::client::Connect;
5use crate::{
6    api::{AccountClient, BaseClient, BasinClient},
7    producer::{Producer, ProducerConfig},
8    session::{self, AppendSession, AppendSessionConfig},
9    types::{
10        AccessTokenId, AccessTokenInfo, AppendAck, AppendInput, BasinConfig, BasinInfo, BasinName,
11        BasinState, CreateBasinInput, CreateStreamInput, DeleteBasinInput, DeleteStreamInput,
12        GetAccountMetricsInput, GetBasinMetricsInput, GetStreamMetricsInput, IssueAccessTokenInput,
13        ListAccessTokensInput, ListAllAccessTokensInput, ListAllBasinsInput, ListAllStreamsInput,
14        ListBasinsInput, ListStreamsInput, Metric, Page, ReadBatch, ReadInput,
15        ReconfigureBasinInput, ReconfigureStreamInput, S2Config, S2Error, StreamConfig, StreamInfo,
16        StreamName, StreamPosition, Streaming,
17    },
18};
19
20#[derive(Debug, Clone)]
21/// An S2 account.
22pub struct S2 {
23    client: AccountClient,
24}
25
26impl S2 {
27    /// Create a new [`S2`].
28    pub fn new(config: S2Config) -> Result<Self, S2Error> {
29        let base_client = BaseClient::init(&config)?;
30        Ok(Self {
31            client: AccountClient::init(config, base_client),
32        })
33    }
34
35    #[doc(hidden)]
36    #[cfg(feature = "_hidden")]
37    pub fn new_with_connector<C>(config: S2Config, connector: C) -> Result<Self, S2Error>
38    where
39        C: Connect + Clone + Send + Sync + 'static,
40    {
41        let base_client = BaseClient::init_with_connector(&config, connector)?;
42        Ok(Self {
43            client: AccountClient::init(config, base_client),
44        })
45    }
46
47    /// Get an [`S2Basin`].
48    pub fn basin(&self, name: BasinName) -> S2Basin {
49        S2Basin {
50            client: self.client.basin_client(name),
51        }
52    }
53
54    /// List a page of basins.
55    ///
56    /// See [`list_all_basins`](crate::S2::list_all_basins) for automatic pagination.
57    pub async fn list_basins(&self, input: ListBasinsInput) -> Result<Page<BasinInfo>, S2Error> {
58        let response = self.client.list_basins(input.into()).await?;
59        Ok(Page::new(
60            response
61                .basins
62                .into_iter()
63                .map(Into::into)
64                .collect::<Vec<_>>(),
65            response.has_more,
66        ))
67    }
68
69    /// List all basins, paginating automatically.
70    pub fn list_all_basins(&self, input: ListAllBasinsInput) -> Streaming<BasinInfo> {
71        let s2 = self.clone();
72        let prefix = input.prefix;
73        let start_after = input.start_after;
74        let include_deleted = input.include_deleted;
75        let mut input = ListBasinsInput::new()
76            .with_prefix(prefix)
77            .with_start_after(start_after);
78        Box::pin(async_stream::try_stream! {
79            loop {
80                let page = s2.list_basins(input.clone()).await?;
81
82                let start_after = page.values.last().map(|info| info.name.clone().into());
83                for info in page.values {
84                    if !include_deleted && info.state == BasinState::Deleting {
85                        continue;
86                    }
87                    yield info;
88                }
89
90                if page.has_more && let Some(start_after) = start_after {
91                    input = input.with_start_after(start_after);
92                } else {
93                    break;
94                }
95            }
96        })
97    }
98
99    /// Create a basin.
100    pub async fn create_basin(&self, input: CreateBasinInput) -> Result<BasinInfo, S2Error> {
101        let (request, idempotency_token) = input.into();
102        let info = self.client.create_basin(request, idempotency_token).await?;
103        Ok(info.into())
104    }
105
106    /// Get basin configuration.
107    pub async fn get_basin_config(&self, name: BasinName) -> Result<BasinConfig, S2Error> {
108        let config = self.client.get_basin_config(name).await?;
109        Ok(config.into())
110    }
111
112    /// Delete a basin.
113    pub async fn delete_basin(&self, input: DeleteBasinInput) -> Result<(), S2Error> {
114        Ok(self
115            .client
116            .delete_basin(input.name, input.ignore_not_found)
117            .await?)
118    }
119
120    /// Reconfigure a basin.
121    pub async fn reconfigure_basin(
122        &self,
123        input: ReconfigureBasinInput,
124    ) -> Result<BasinConfig, S2Error> {
125        let config = self
126            .client
127            .reconfigure_basin(input.name, input.config.into())
128            .await?;
129        Ok(config.into())
130    }
131
132    /// List a page of access tokens.
133    ///
134    /// See [`list_all_access_tokens`](crate::S2::list_all_access_tokens) for automatic pagination.
135    pub async fn list_access_tokens(
136        &self,
137        input: ListAccessTokensInput,
138    ) -> Result<Page<AccessTokenInfo>, S2Error> {
139        let response = self.client.list_access_tokens(input.into()).await?;
140        Ok(Page::new(
141            response
142                .access_tokens
143                .into_iter()
144                .map(TryInto::try_into)
145                .collect::<Result<Vec<_>, _>>()?,
146            response.has_more,
147        ))
148    }
149
150    /// List all access tokens, paginating automatically.
151    pub fn list_all_access_tokens(
152        &self,
153        input: ListAllAccessTokensInput,
154    ) -> Streaming<AccessTokenInfo> {
155        let s2 = self.clone();
156        let prefix = input.prefix;
157        let start_after = input.start_after;
158        let mut input = ListAccessTokensInput::new()
159            .with_prefix(prefix)
160            .with_start_after(start_after);
161        Box::pin(async_stream::try_stream! {
162            loop {
163                let page = s2.list_access_tokens(input.clone()).await?;
164
165                let start_after = page.values.last().map(|info| info.id.clone().into());
166                for info in page.values {
167                    yield info;
168                }
169
170                if page.has_more && let Some(start_after) = start_after {
171                    input = input.with_start_after(start_after);
172                } else {
173                    break;
174                }
175            }
176        })
177    }
178
179    /// Issue an access token.
180    pub async fn issue_access_token(
181        &self,
182        input: IssueAccessTokenInput,
183    ) -> Result<String, S2Error> {
184        let response = self.client.issue_access_token(input.into()).await?;
185        Ok(response.access_token)
186    }
187
188    /// Revoke an access token.
189    pub async fn revoke_access_token(&self, id: AccessTokenId) -> Result<(), S2Error> {
190        Ok(self.client.revoke_access_token(id).await?)
191    }
192
193    /// Get account metrics.
194    pub async fn get_account_metrics(
195        &self,
196        input: GetAccountMetricsInput,
197    ) -> Result<Vec<Metric>, S2Error> {
198        let response = self.client.get_account_metrics(input.into()).await?;
199        Ok(response.values.into_iter().map(Into::into).collect())
200    }
201
202    /// Get basin metrics.
203    pub async fn get_basin_metrics(
204        &self,
205        input: GetBasinMetricsInput,
206    ) -> Result<Vec<Metric>, S2Error> {
207        let (name, request) = input.into();
208        let response = self.client.get_basin_metrics(name, request).await?;
209        Ok(response.values.into_iter().map(Into::into).collect())
210    }
211
212    /// Get stream metrics.
213    pub async fn get_stream_metrics(
214        &self,
215        input: GetStreamMetricsInput,
216    ) -> Result<Vec<Metric>, S2Error> {
217        let (basin_name, stream_name, request) = input.into();
218        let response = self
219            .client
220            .get_stream_metrics(basin_name, stream_name, request)
221            .await?;
222        Ok(response.values.into_iter().map(Into::into).collect())
223    }
224}
225
226#[derive(Debug, Clone)]
227/// A basin in an S2 account.
228///
229/// See [`S2::basin`].
230pub struct S2Basin {
231    client: BasinClient,
232}
233
234impl S2Basin {
235    /// Get an [`S2Stream`].
236    pub fn stream(&self, name: StreamName) -> S2Stream {
237        S2Stream {
238            client: self.client.clone(),
239            name,
240        }
241    }
242
243    /// List a page of streams.
244    ///
245    /// See [`list_all_streams`](crate::S2Basin::list_all_streams) for automatic pagination.
246    pub async fn list_streams(&self, input: ListStreamsInput) -> Result<Page<StreamInfo>, S2Error> {
247        let response = self.client.list_streams(input.into()).await?;
248        Ok(Page::new(
249            response
250                .streams
251                .into_iter()
252                .map(TryInto::try_into)
253                .collect::<Result<Vec<_>, _>>()?,
254            response.has_more,
255        ))
256    }
257
258    /// List all streams, paginating automatically.
259    pub fn list_all_streams(&self, input: ListAllStreamsInput) -> Streaming<StreamInfo> {
260        let basin = self.clone();
261        let prefix = input.prefix;
262        let start_after = input.start_after;
263        let include_deleted = input.include_deleted;
264        let mut input = ListStreamsInput::new()
265            .with_prefix(prefix)
266            .with_start_after(start_after);
267        Box::pin(async_stream::try_stream! {
268            loop {
269                let page = basin.list_streams(input.clone()).await?;
270
271                let start_after = page.values.last().map(|info| info.name.clone().into());
272                for info in page.values {
273                    if !include_deleted && info.deleted_at.is_some() {
274                        continue;
275                    }
276                    yield info;
277                }
278
279                if page.has_more && let Some(start_after) = start_after {
280                    input = input.with_start_after(start_after);
281                } else {
282                    break;
283                }
284            }
285        })
286    }
287
288    /// Create a stream.
289    pub async fn create_stream(&self, input: CreateStreamInput) -> Result<StreamInfo, S2Error> {
290        let (request, idempotency_token) = input.into();
291        let info = self
292            .client
293            .create_stream(request, idempotency_token)
294            .await?;
295        Ok(info.try_into()?)
296    }
297
298    /// Get stream configuration.
299    pub async fn get_stream_config(&self, name: StreamName) -> Result<StreamConfig, S2Error> {
300        let config = self.client.get_stream_config(name).await?;
301        Ok(config.into())
302    }
303
304    /// Delete a stream.
305    pub async fn delete_stream(&self, input: DeleteStreamInput) -> Result<(), S2Error> {
306        Ok(self
307            .client
308            .delete_stream(input.name, input.ignore_not_found)
309            .await?)
310    }
311
312    /// Reconfigure a stream.
313    pub async fn reconfigure_stream(
314        &self,
315        input: ReconfigureStreamInput,
316    ) -> Result<StreamConfig, S2Error> {
317        let config = self
318            .client
319            .reconfigure_stream(input.name, input.config.into())
320            .await?;
321        Ok(config.into())
322    }
323}
324
325#[derive(Debug, Clone)]
326/// A stream in an S2 basin.
327///
328/// See [`S2Basin::stream`].
329pub struct S2Stream {
330    client: BasinClient,
331    name: StreamName,
332}
333
334impl S2Stream {
335    /// Check tail position.
336    pub async fn check_tail(&self) -> Result<StreamPosition, S2Error> {
337        let response = self.client.check_tail(&self.name).await?;
338        Ok(response.tail.into())
339    }
340
341    /// Append records.
342    pub async fn append(&self, input: AppendInput) -> Result<AppendAck, S2Error> {
343        let retry_enabled = self
344            .client
345            .config
346            .retry
347            .append_retry_policy
348            .is_compliant(&input);
349        let ack = self
350            .client
351            .append(&self.name, input.into(), retry_enabled)
352            .await?;
353        Ok(ack.into())
354    }
355
356    /// Read records.
357    pub async fn read(&self, input: ReadInput) -> Result<ReadBatch, S2Error> {
358        let batch = self
359            .client
360            .read(&self.name, input.start.into(), input.stop.into())
361            .await?;
362        Ok(ReadBatch::from_api(batch, input.ignore_command_records))
363    }
364
365    /// Create an append session for submitting [`AppendInput`]s.
366    pub fn append_session(&self, config: AppendSessionConfig) -> AppendSession {
367        AppendSession::new(self.client.clone(), self.name.clone(), config)
368    }
369
370    /// Create a producer for submitting individual [`AppendRecord`](crate::types::AppendRecord)s.
371    pub fn producer(&self, config: ProducerConfig) -> Producer {
372        Producer::new(self.client.clone(), self.name.clone(), config)
373    }
374
375    /// Create a read session.
376    pub async fn read_session(&self, input: ReadInput) -> Result<Streaming<ReadBatch>, S2Error> {
377        let batches = session::read_session(
378            self.client.clone(),
379            self.name.clone(),
380            input.start.into(),
381            input.stop.into(),
382            input.ignore_command_records,
383        )
384        .await?;
385        Ok(Box::pin(batches.map(|res| match res {
386            Ok(batch) => Ok(batch),
387            Err(err) => Err(err.into()),
388        })))
389    }
390}