Skip to main content

s2_sdk/
ops.rs

1use futures::StreamExt;
2
3#[cfg(feature = "_hidden")]
4use crate::client::Connect;
5#[cfg(feature = "_hidden")]
6use crate::types::{
7    CreateOrReconfigureBasinInput, CreateOrReconfigureStreamInput, CreateOrReconfigured,
8};
9use crate::{
10    api::{AccountClient, BaseClient, BasinClient},
11    producer::{Producer, ProducerConfig},
12    session::{self, AppendSession, AppendSessionConfig},
13    types::{
14        AccessTokenId, AccessTokenInfo, AppendAck, AppendInput, BasinConfig, BasinInfo, BasinName,
15        BasinState, CreateBasinInput, CreateStreamInput, DeleteBasinInput, DeleteStreamInput,
16        GetAccountMetricsInput, GetBasinMetricsInput, GetStreamMetricsInput, IssueAccessTokenInput,
17        ListAccessTokensInput, ListAllAccessTokensInput, ListAllBasinsInput, ListAllStreamsInput,
18        ListBasinsInput, ListStreamsInput, Metric, Page, ReadBatch, ReadInput,
19        ReconfigureBasinInput, ReconfigureStreamInput, S2Config, S2Error, StreamConfig, StreamInfo,
20        StreamName, StreamPosition, Streaming,
21    },
22};
23
24#[derive(Debug, Clone)]
25/// An S2 account.
26pub struct S2 {
27    client: AccountClient,
28}
29
30impl S2 {
31    /// Create a new [`S2`].
32    pub fn new(config: S2Config) -> Result<Self, S2Error> {
33        let base_client = BaseClient::init(&config)?;
34        Ok(Self {
35            client: AccountClient::init(config, base_client),
36        })
37    }
38
39    #[doc(hidden)]
40    #[cfg(feature = "_hidden")]
41    pub fn new_with_connector<C>(config: S2Config, connector: C) -> Result<Self, S2Error>
42    where
43        C: Connect + Clone + Send + Sync + 'static,
44    {
45        let base_client = BaseClient::init_with_connector(&config, connector)?;
46        Ok(Self {
47            client: AccountClient::init(config, base_client),
48        })
49    }
50
51    /// Get an [`S2Basin`].
52    pub fn basin(&self, name: BasinName) -> S2Basin {
53        S2Basin {
54            client: self.client.basin_client(name),
55        }
56    }
57
58    /// List a page of basins.
59    ///
60    /// See [`list_all_basins`](crate::S2::list_all_basins) for automatic pagination.
61    pub async fn list_basins(&self, input: ListBasinsInput) -> Result<Page<BasinInfo>, S2Error> {
62        let response = self.client.list_basins(input.into()).await?;
63        Ok(Page::new(
64            response
65                .basins
66                .into_iter()
67                .map(Into::into)
68                .collect::<Vec<_>>(),
69            response.has_more,
70        ))
71    }
72
73    /// List all basins, paginating automatically.
74    pub fn list_all_basins(&self, input: ListAllBasinsInput) -> Streaming<BasinInfo> {
75        let s2 = self.clone();
76        let prefix = input.prefix;
77        let start_after = input.start_after;
78        let include_deleted = input.include_deleted;
79        let mut input = ListBasinsInput::new()
80            .with_prefix(prefix)
81            .with_start_after(start_after);
82        Box::pin(async_stream::try_stream! {
83            loop {
84                let page = s2.list_basins(input.clone()).await?;
85
86                let start_after = page.values.last().map(|info| info.name.clone().into());
87                for info in page.values {
88                    if !include_deleted && info.state == BasinState::Deleting {
89                        continue;
90                    }
91                    yield info;
92                }
93
94                if page.has_more && let Some(start_after) = start_after {
95                    input = input.with_start_after(start_after);
96                } else {
97                    break;
98                }
99            }
100        })
101    }
102
103    /// Create a basin.
104    pub async fn create_basin(&self, input: CreateBasinInput) -> Result<BasinInfo, S2Error> {
105        let (request, idempotency_token) = input.into();
106        let info = self.client.create_basin(request, idempotency_token).await?;
107        Ok(info.into())
108    }
109
110    /// Create or reconfigure a basin.
111    ///
112    /// Creates the basin if it doesn't exist, or reconfigures it to match the provided
113    /// configuration if it does. Uses HTTP PUT semantics — always idempotent.
114    ///
115    /// Returns [`CreateOrReconfigured::Created`] with the basin info if the basin was newly
116    /// created, or [`CreateOrReconfigured::Reconfigured`] if it already existed.
117    #[doc(hidden)]
118    #[cfg(feature = "_hidden")]
119    pub async fn create_or_reconfigure_basin(
120        &self,
121        input: CreateOrReconfigureBasinInput,
122    ) -> Result<CreateOrReconfigured<BasinInfo>, S2Error> {
123        let (name, request) = input.into();
124        let (was_created, info) = self
125            .client
126            .create_or_reconfigure_basin(name, request)
127            .await?;
128        let info = info.into();
129        Ok(if was_created {
130            CreateOrReconfigured::Created(info)
131        } else {
132            CreateOrReconfigured::Reconfigured(info)
133        })
134    }
135
136    /// Get basin configuration.
137    pub async fn get_basin_config(&self, name: BasinName) -> Result<BasinConfig, S2Error> {
138        let config = self.client.get_basin_config(name).await?;
139        Ok(config.into())
140    }
141
142    /// Delete a basin.
143    pub async fn delete_basin(&self, input: DeleteBasinInput) -> Result<(), S2Error> {
144        Ok(self
145            .client
146            .delete_basin(input.name, input.ignore_not_found)
147            .await?)
148    }
149
150    /// Reconfigure a basin.
151    pub async fn reconfigure_basin(
152        &self,
153        input: ReconfigureBasinInput,
154    ) -> Result<BasinConfig, S2Error> {
155        let config = self
156            .client
157            .reconfigure_basin(input.name, input.config.into())
158            .await?;
159        Ok(config.into())
160    }
161
162    /// List a page of access tokens.
163    ///
164    /// See [`list_all_access_tokens`](crate::S2::list_all_access_tokens) for automatic pagination.
165    pub async fn list_access_tokens(
166        &self,
167        input: ListAccessTokensInput,
168    ) -> Result<Page<AccessTokenInfo>, S2Error> {
169        let response = self.client.list_access_tokens(input.into()).await?;
170        Ok(Page::new(
171            response
172                .access_tokens
173                .into_iter()
174                .map(TryInto::try_into)
175                .collect::<Result<Vec<_>, _>>()?,
176            response.has_more,
177        ))
178    }
179
180    /// List all access tokens, paginating automatically.
181    pub fn list_all_access_tokens(
182        &self,
183        input: ListAllAccessTokensInput,
184    ) -> Streaming<AccessTokenInfo> {
185        let s2 = self.clone();
186        let prefix = input.prefix;
187        let start_after = input.start_after;
188        let mut input = ListAccessTokensInput::new()
189            .with_prefix(prefix)
190            .with_start_after(start_after);
191        Box::pin(async_stream::try_stream! {
192            loop {
193                let page = s2.list_access_tokens(input.clone()).await?;
194
195                let start_after = page.values.last().map(|info| info.id.clone().into());
196                for info in page.values {
197                    yield info;
198                }
199
200                if page.has_more && let Some(start_after) = start_after {
201                    input = input.with_start_after(start_after);
202                } else {
203                    break;
204                }
205            }
206        })
207    }
208
209    /// Issue an access token.
210    pub async fn issue_access_token(
211        &self,
212        input: IssueAccessTokenInput,
213    ) -> Result<String, S2Error> {
214        let response = self.client.issue_access_token(input.into()).await?;
215        Ok(response.access_token)
216    }
217
218    /// Revoke an access token.
219    pub async fn revoke_access_token(&self, id: AccessTokenId) -> Result<(), S2Error> {
220        Ok(self.client.revoke_access_token(id).await?)
221    }
222
223    /// Get account metrics.
224    pub async fn get_account_metrics(
225        &self,
226        input: GetAccountMetricsInput,
227    ) -> Result<Vec<Metric>, S2Error> {
228        let response = self.client.get_account_metrics(input.into()).await?;
229        Ok(response.values.into_iter().map(Into::into).collect())
230    }
231
232    /// Get basin metrics.
233    pub async fn get_basin_metrics(
234        &self,
235        input: GetBasinMetricsInput,
236    ) -> Result<Vec<Metric>, S2Error> {
237        let (name, request) = input.into();
238        let response = self.client.get_basin_metrics(name, request).await?;
239        Ok(response.values.into_iter().map(Into::into).collect())
240    }
241
242    /// Get stream metrics.
243    pub async fn get_stream_metrics(
244        &self,
245        input: GetStreamMetricsInput,
246    ) -> Result<Vec<Metric>, S2Error> {
247        let (basin_name, stream_name, request) = input.into();
248        let response = self
249            .client
250            .get_stream_metrics(basin_name, stream_name, request)
251            .await?;
252        Ok(response.values.into_iter().map(Into::into).collect())
253    }
254}
255
256#[derive(Debug, Clone)]
257/// A basin in an S2 account.
258///
259/// See [`S2::basin`].
260pub struct S2Basin {
261    client: BasinClient,
262}
263
264impl S2Basin {
265    /// Get an [`S2Stream`].
266    pub fn stream(&self, name: StreamName) -> S2Stream {
267        S2Stream {
268            client: self.client.clone(),
269            name,
270        }
271    }
272
273    /// List a page of streams.
274    ///
275    /// See [`list_all_streams`](crate::S2Basin::list_all_streams) for automatic pagination.
276    pub async fn list_streams(&self, input: ListStreamsInput) -> Result<Page<StreamInfo>, S2Error> {
277        let response = self.client.list_streams(input.into()).await?;
278        Ok(Page::new(
279            response
280                .streams
281                .into_iter()
282                .map(TryInto::try_into)
283                .collect::<Result<Vec<_>, _>>()?,
284            response.has_more,
285        ))
286    }
287
288    /// List all streams, paginating automatically.
289    pub fn list_all_streams(&self, input: ListAllStreamsInput) -> Streaming<StreamInfo> {
290        let basin = self.clone();
291        let prefix = input.prefix;
292        let start_after = input.start_after;
293        let include_deleted = input.include_deleted;
294        let mut input = ListStreamsInput::new()
295            .with_prefix(prefix)
296            .with_start_after(start_after);
297        Box::pin(async_stream::try_stream! {
298            loop {
299                let page = basin.list_streams(input.clone()).await?;
300
301                let start_after = page.values.last().map(|info| info.name.clone().into());
302                for info in page.values {
303                    if !include_deleted && info.deleted_at.is_some() {
304                        continue;
305                    }
306                    yield info;
307                }
308
309                if page.has_more && let Some(start_after) = start_after {
310                    input = input.with_start_after(start_after);
311                } else {
312                    break;
313                }
314            }
315        })
316    }
317
318    /// Create a stream.
319    pub async fn create_stream(&self, input: CreateStreamInput) -> Result<StreamInfo, S2Error> {
320        let (request, idempotency_token) = input.into();
321        let info = self
322            .client
323            .create_stream(request, idempotency_token)
324            .await?;
325        Ok(info.try_into()?)
326    }
327
328    /// Create or reconfigure a stream.
329    ///
330    /// Creates the stream if it doesn't exist, or reconfigures it to match the provided
331    /// configuration if it does. Uses HTTP PUT semantics — always idempotent.
332    ///
333    /// Returns [`CreateOrReconfigured::Created`] with the stream info if the stream was newly
334    /// created, or [`CreateOrReconfigured::Reconfigured`] if it already existed.
335    #[doc(hidden)]
336    #[cfg(feature = "_hidden")]
337    pub async fn create_or_reconfigure_stream(
338        &self,
339        input: CreateOrReconfigureStreamInput,
340    ) -> Result<CreateOrReconfigured<StreamInfo>, S2Error> {
341        let (name, config) = input.into();
342        let (was_created, info) = self
343            .client
344            .create_or_reconfigure_stream(name, config)
345            .await?;
346        let info = info.try_into()?;
347        Ok(if was_created {
348            CreateOrReconfigured::Created(info)
349        } else {
350            CreateOrReconfigured::Reconfigured(info)
351        })
352    }
353
354    /// Get stream configuration.
355    pub async fn get_stream_config(&self, name: StreamName) -> Result<StreamConfig, S2Error> {
356        let config = self.client.get_stream_config(name).await?;
357        Ok(config.into())
358    }
359
360    /// Delete a stream.
361    pub async fn delete_stream(&self, input: DeleteStreamInput) -> Result<(), S2Error> {
362        Ok(self
363            .client
364            .delete_stream(input.name, input.ignore_not_found)
365            .await?)
366    }
367
368    /// Reconfigure a stream.
369    pub async fn reconfigure_stream(
370        &self,
371        input: ReconfigureStreamInput,
372    ) -> Result<StreamConfig, S2Error> {
373        let config = self
374            .client
375            .reconfigure_stream(input.name, input.config.into())
376            .await?;
377        Ok(config.into())
378    }
379}
380
381#[derive(Debug, Clone)]
382/// A stream in an S2 basin.
383///
384/// See [`S2Basin::stream`].
385pub struct S2Stream {
386    client: BasinClient,
387    name: StreamName,
388}
389
390impl S2Stream {
391    /// Check tail position.
392    pub async fn check_tail(&self) -> Result<StreamPosition, S2Error> {
393        let response = self.client.check_tail(&self.name).await?;
394        Ok(response.tail.into())
395    }
396
397    /// Append records.
398    pub async fn append(&self, input: AppendInput) -> Result<AppendAck, S2Error> {
399        let ack = self
400            .client
401            .append(
402                &self.name,
403                input.into(),
404                self.client.config.retry.append_retry_policy,
405            )
406            .await?;
407        Ok(ack.into())
408    }
409
410    /// Read records.
411    pub async fn read(&self, input: ReadInput) -> Result<ReadBatch, S2Error> {
412        let batch = self
413            .client
414            .read(&self.name, input.start.into(), input.stop.into())
415            .await?;
416        Ok(ReadBatch::from_api(batch, input.ignore_command_records))
417    }
418
419    /// Create an append session for submitting [`AppendInput`]s.
420    pub fn append_session(&self, config: AppendSessionConfig) -> AppendSession {
421        AppendSession::new(self.client.clone(), self.name.clone(), config)
422    }
423
424    /// Create a producer for submitting individual [`AppendRecord`](crate::types::AppendRecord)s.
425    pub fn producer(&self, config: ProducerConfig) -> Producer {
426        Producer::new(self.client.clone(), self.name.clone(), config)
427    }
428
429    /// Create a read session.
430    pub async fn read_session(&self, input: ReadInput) -> Result<Streaming<ReadBatch>, S2Error> {
431        let batches = session::read_session(
432            self.client.clone(),
433            self.name.clone(),
434            input.start.into(),
435            input.stop.into(),
436            input.ignore_command_records,
437        )
438        .await?;
439        Ok(Box::pin(batches.map(|res| match res {
440            Ok(batch) => Ok(batch),
441            Err(err) => Err(err.into()),
442        })))
443    }
444}