Skip to main content

s2_sdk/
ops.rs

1use futures::StreamExt;
2
3#[cfg(feature = "_hidden")]
4use crate::client::Connect;
5#[cfg(feature = "_hidden")]
6use crate::types::{
7    CreateOrReconfigureBasinInput, CreateOrReconfigureStreamInput, CreateOrReconfigured,
8};
9use crate::{
10    api::{AccountClient, BaseClient, BasinClient},
11    producer::{Producer, ProducerConfig},
12    session::{self, AppendSession, AppendSessionConfig},
13    types::{
14        AccessTokenId, AccessTokenInfo, AppendAck, AppendInput, BasinConfig, BasinInfo, BasinName,
15        CreateBasinInput, CreateStreamInput, DeleteBasinInput, DeleteStreamInput, EncryptionSpec,
16        GetAccountMetricsInput, GetBasinMetricsInput, GetStreamMetricsInput, IssueAccessTokenInput,
17        ListAccessTokensInput, ListAllAccessTokensInput, ListAllBasinsInput, ListAllStreamsInput,
18        ListBasinsInput, ListStreamsInput, Metric, Page, ReadBatch, ReadInput,
19        ReconfigureBasinInput, ReconfigureStreamInput, S2Config, S2Error, StreamConfig, StreamInfo,
20        StreamName, StreamPosition, Streaming,
21    },
22};
23
24#[derive(Debug, Clone)]
25/// An S2 account.
26pub struct S2 {
27    client: AccountClient,
28}
29
30impl S2 {
31    /// Create a new [`S2`].
32    pub fn new(config: S2Config) -> Result<Self, S2Error> {
33        let base_client = BaseClient::init(&config)?;
34        Ok(Self {
35            client: AccountClient::init(config, base_client),
36        })
37    }
38
39    #[doc(hidden)]
40    #[cfg(feature = "_hidden")]
41    pub fn new_with_connector<C>(config: S2Config, connector: C) -> Result<Self, S2Error>
42    where
43        C: Connect + Clone + Send + Sync + 'static,
44    {
45        let base_client = BaseClient::init_with_connector(&config, connector)?;
46        Ok(Self {
47            client: AccountClient::init(config, base_client),
48        })
49    }
50
51    /// Get an [`S2Basin`].
52    pub fn basin(&self, name: BasinName) -> S2Basin {
53        S2Basin {
54            client: self.client.basin_client(name),
55        }
56    }
57
58    /// List a page of basins.
59    ///
60    /// See [`list_all_basins`](crate::S2::list_all_basins) for automatic pagination.
61    pub async fn list_basins(&self, input: ListBasinsInput) -> Result<Page<BasinInfo>, S2Error> {
62        let response = self.client.list_basins(input.into()).await?;
63        Ok(Page::new(
64            response
65                .basins
66                .into_iter()
67                .map(TryInto::try_into)
68                .collect::<Result<Vec<_>, _>>()?,
69            response.has_more,
70        ))
71    }
72
73    /// List all basins, paginating automatically.
74    pub fn list_all_basins(&self, input: ListAllBasinsInput) -> Streaming<BasinInfo> {
75        let s2 = self.clone();
76        let prefix = input.prefix;
77        let start_after = input.start_after;
78        let include_deleted = input.include_deleted;
79        let mut input = ListBasinsInput::new()
80            .with_prefix(prefix)
81            .with_start_after(start_after);
82        Box::pin(async_stream::try_stream! {
83            loop {
84                let page = s2.list_basins(input.clone()).await?;
85                let start_after = page.values.last().map(|info| info.name.clone().into());
86
87                for info in page.values {
88                    if !include_deleted && info.deleted_at.is_some() {
89                        continue;
90                    }
91                    yield info;
92                }
93
94                if page.has_more && let Some(start_after) = start_after {
95                    input = input.with_start_after(start_after);
96                } else {
97                    break;
98                }
99            }
100        })
101    }
102
103    /// Create a basin.
104    pub async fn create_basin(&self, input: CreateBasinInput) -> Result<BasinInfo, S2Error> {
105        let (request, idempotency_token) = input.into();
106        let info = self.client.create_basin(request, idempotency_token).await?;
107        Ok(info.try_into()?)
108    }
109
110    /// Create or reconfigure a basin.
111    ///
112    /// Creates the basin if it doesn't exist, or reconfigures it to match the provided
113    /// configuration if it does. Uses HTTP PUT semantics — always idempotent.
114    ///
115    /// Returns [`CreateOrReconfigured::Created`] with the basin info if the basin was newly
116    /// created, or [`CreateOrReconfigured::Reconfigured`] if it already existed.
117    #[doc(hidden)]
118    #[cfg(feature = "_hidden")]
119    pub async fn create_or_reconfigure_basin(
120        &self,
121        input: CreateOrReconfigureBasinInput,
122    ) -> Result<CreateOrReconfigured<BasinInfo>, S2Error> {
123        let (name, request) = input.into();
124        let (was_created, info) = self
125            .client
126            .create_or_reconfigure_basin(name, request)
127            .await?;
128        let info = info.try_into()?;
129        Ok(if was_created {
130            CreateOrReconfigured::Created(info)
131        } else {
132            CreateOrReconfigured::Reconfigured(info)
133        })
134    }
135
136    /// Get basin configuration.
137    pub async fn get_basin_config(&self, name: BasinName) -> Result<BasinConfig, S2Error> {
138        let config = self.client.get_basin_config(name).await?;
139        Ok(config.into())
140    }
141
142    /// Delete a basin.
143    pub async fn delete_basin(&self, input: DeleteBasinInput) -> Result<(), S2Error> {
144        Ok(self
145            .client
146            .delete_basin(input.name, input.ignore_not_found)
147            .await?)
148    }
149
150    /// Reconfigure a basin.
151    pub async fn reconfigure_basin(
152        &self,
153        input: ReconfigureBasinInput,
154    ) -> Result<BasinConfig, S2Error> {
155        let config = self
156            .client
157            .reconfigure_basin(input.name, input.config.into())
158            .await?;
159        Ok(config.into())
160    }
161
162    /// List a page of access tokens.
163    ///
164    /// See [`list_all_access_tokens`](crate::S2::list_all_access_tokens) for automatic pagination.
165    pub async fn list_access_tokens(
166        &self,
167        input: ListAccessTokensInput,
168    ) -> Result<Page<AccessTokenInfo>, S2Error> {
169        let response = self.client.list_access_tokens(input.into()).await?;
170        Ok(Page::new(
171            response
172                .access_tokens
173                .into_iter()
174                .map(TryInto::try_into)
175                .collect::<Result<Vec<_>, _>>()?,
176            response.has_more,
177        ))
178    }
179
180    /// List all access tokens, paginating automatically.
181    pub fn list_all_access_tokens(
182        &self,
183        input: ListAllAccessTokensInput,
184    ) -> Streaming<AccessTokenInfo> {
185        let s2 = self.clone();
186        let prefix = input.prefix;
187        let start_after = input.start_after;
188        let mut input = ListAccessTokensInput::new()
189            .with_prefix(prefix)
190            .with_start_after(start_after);
191        Box::pin(async_stream::try_stream! {
192            loop {
193                let page = s2.list_access_tokens(input.clone()).await?;
194
195                let start_after = page.values.last().map(|info| info.id.clone().into());
196                for info in page.values {
197                    yield info;
198                }
199
200                if page.has_more && let Some(start_after) = start_after {
201                    input = input.with_start_after(start_after);
202                } else {
203                    break;
204                }
205            }
206        })
207    }
208
209    /// Issue an access token.
210    pub async fn issue_access_token(
211        &self,
212        input: IssueAccessTokenInput,
213    ) -> Result<String, S2Error> {
214        let response = self.client.issue_access_token(input.into()).await?;
215        Ok(response.access_token)
216    }
217
218    /// Revoke an access token.
219    pub async fn revoke_access_token(&self, id: AccessTokenId) -> Result<(), S2Error> {
220        Ok(self.client.revoke_access_token(id).await?)
221    }
222
223    /// Get account metrics.
224    pub async fn get_account_metrics(
225        &self,
226        input: GetAccountMetricsInput,
227    ) -> Result<Vec<Metric>, S2Error> {
228        let response = self.client.get_account_metrics(input.into()).await?;
229        Ok(response.values.into_iter().map(Into::into).collect())
230    }
231
232    /// Get basin metrics.
233    pub async fn get_basin_metrics(
234        &self,
235        input: GetBasinMetricsInput,
236    ) -> Result<Vec<Metric>, S2Error> {
237        let (name, request) = input.into();
238        let response = self.client.get_basin_metrics(name, request).await?;
239        Ok(response.values.into_iter().map(Into::into).collect())
240    }
241
242    /// Get stream metrics.
243    pub async fn get_stream_metrics(
244        &self,
245        input: GetStreamMetricsInput,
246    ) -> Result<Vec<Metric>, S2Error> {
247        let (basin_name, stream_name, request) = input.into();
248        let response = self
249            .client
250            .get_stream_metrics(basin_name, stream_name, request)
251            .await?;
252        Ok(response.values.into_iter().map(Into::into).collect())
253    }
254}
255
256#[derive(Debug, Clone)]
257/// A basin in an S2 account.
258///
259/// See [`S2::basin`].
260pub struct S2Basin {
261    client: BasinClient,
262}
263
264impl S2Basin {
265    /// Get an [`S2Stream`].
266    pub fn stream(&self, name: StreamName) -> S2Stream {
267        S2Stream {
268            client: self.client.clone(),
269            name,
270            encryption: None,
271        }
272    }
273
274    /// List a page of streams.
275    ///
276    /// See [`list_all_streams`](crate::S2Basin::list_all_streams) for automatic pagination.
277    pub async fn list_streams(&self, input: ListStreamsInput) -> Result<Page<StreamInfo>, S2Error> {
278        let response = self.client.list_streams(input.into()).await?;
279        Ok(Page::new(
280            response
281                .streams
282                .into_iter()
283                .map(TryInto::try_into)
284                .collect::<Result<Vec<_>, _>>()?,
285            response.has_more,
286        ))
287    }
288
289    /// List all streams, paginating automatically.
290    pub fn list_all_streams(&self, input: ListAllStreamsInput) -> Streaming<StreamInfo> {
291        let basin = self.clone();
292        let prefix = input.prefix;
293        let start_after = input.start_after;
294        let include_deleted = input.include_deleted;
295        let mut input = ListStreamsInput::new()
296            .with_prefix(prefix)
297            .with_start_after(start_after);
298        Box::pin(async_stream::try_stream! {
299            loop {
300                let page = basin.list_streams(input.clone()).await?;
301                let start_after = page.values.last().map(|info| info.name.clone().into());
302
303                for info in page.values {
304                    if !include_deleted && info.deleted_at.is_some() {
305                        continue;
306                    }
307                    yield info;
308                }
309
310                if page.has_more && let Some(start_after) = start_after {
311                    input = input.with_start_after(start_after);
312                } else {
313                    break;
314                }
315            }
316        })
317    }
318
319    /// Create a stream.
320    pub async fn create_stream(&self, input: CreateStreamInput) -> Result<StreamInfo, S2Error> {
321        let (request, idempotency_token) = input.into();
322        let info = self
323            .client
324            .create_stream(request, idempotency_token)
325            .await?;
326        Ok(info.try_into()?)
327    }
328
329    /// Create or reconfigure a stream.
330    ///
331    /// Creates the stream if it doesn't exist, or reconfigures it to match the provided
332    /// configuration if it does. Uses HTTP PUT semantics — always idempotent.
333    ///
334    /// Returns [`CreateOrReconfigured::Created`] with the stream info if the stream was newly
335    /// created, or [`CreateOrReconfigured::Reconfigured`] if it already existed.
336    #[doc(hidden)]
337    #[cfg(feature = "_hidden")]
338    pub async fn create_or_reconfigure_stream(
339        &self,
340        input: CreateOrReconfigureStreamInput,
341    ) -> Result<CreateOrReconfigured<StreamInfo>, S2Error> {
342        let (name, config) = input.into();
343        let (was_created, info) = self
344            .client
345            .create_or_reconfigure_stream(name, config)
346            .await?;
347        let info = info.try_into()?;
348        Ok(if was_created {
349            CreateOrReconfigured::Created(info)
350        } else {
351            CreateOrReconfigured::Reconfigured(info)
352        })
353    }
354
355    /// Get stream configuration.
356    pub async fn get_stream_config(&self, name: StreamName) -> Result<StreamConfig, S2Error> {
357        let config = self.client.get_stream_config(name).await?;
358        Ok(config.into())
359    }
360
361    /// Delete a stream.
362    pub async fn delete_stream(&self, input: DeleteStreamInput) -> Result<(), S2Error> {
363        Ok(self
364            .client
365            .delete_stream(input.name, input.ignore_not_found)
366            .await?)
367    }
368
369    /// Reconfigure a stream.
370    pub async fn reconfigure_stream(
371        &self,
372        input: ReconfigureStreamInput,
373    ) -> Result<StreamConfig, S2Error> {
374        let config = self
375            .client
376            .reconfigure_stream(input.name, input.config.into())
377            .await?;
378        Ok(config.into())
379    }
380}
381
382#[derive(Debug, Clone)]
383/// A stream in an S2 basin.
384///
385/// See [`S2Basin::stream`].
386pub struct S2Stream {
387    client: BasinClient,
388    name: StreamName,
389    encryption: Option<EncryptionSpec>,
390}
391
392impl S2Stream {
393    /// Set the encryption spec for this stream handle.
394    pub fn with_encryption(self, encryption: EncryptionSpec) -> Self {
395        Self {
396            encryption: Some(encryption),
397            ..self
398        }
399    }
400
401    /// Check tail position.
402    pub async fn check_tail(&self) -> Result<StreamPosition, S2Error> {
403        let response = self.client.check_tail(&self.name).await?;
404        Ok(response.tail.into())
405    }
406
407    /// Append records.
408    pub async fn append(&self, input: AppendInput) -> Result<AppendAck, S2Error> {
409        let ack = self
410            .client
411            .append(
412                &self.name,
413                input.into(),
414                self.encryption.as_ref(),
415                self.client.config.retry.append_retry_policy,
416            )
417            .await?;
418        Ok(ack.into())
419    }
420
421    /// Read records.
422    pub async fn read(&self, input: ReadInput) -> Result<ReadBatch, S2Error> {
423        let batch = self
424            .client
425            .read(
426                &self.name,
427                input.start.into(),
428                input.stop.into(),
429                self.encryption.as_ref(),
430            )
431            .await?;
432        let mut batch = ReadBatch::from_api(batch);
433        if input.ignore_command_records {
434            batch.records.retain(|r| !r.is_command_record());
435        }
436        Ok(batch)
437    }
438
439    /// Create an append session for submitting [`AppendInput`]s.
440    pub fn append_session(&self, config: AppendSessionConfig) -> AppendSession {
441        AppendSession::new(
442            self.client.clone(),
443            self.name.clone(),
444            self.encryption.clone(),
445            config,
446        )
447    }
448
449    /// Create a producer for submitting individual [`AppendRecord`](crate::types::AppendRecord)s.
450    pub fn producer(&self, config: ProducerConfig) -> Producer {
451        Producer::new(
452            self.client.clone(),
453            self.name.clone(),
454            self.encryption.clone(),
455            config,
456        )
457    }
458
459    /// Create a read session.
460    pub async fn read_session(&self, input: ReadInput) -> Result<Streaming<ReadBatch>, S2Error> {
461        let batches = session::read_session(
462            self.client.clone(),
463            self.name.clone(),
464            self.encryption.clone(),
465            input.start.into(),
466            input.stop.into(),
467            input.ignore_command_records,
468        )
469        .await?;
470        Ok(Box::pin(batches.map(|res| match res {
471            Ok(batch) => Ok(batch),
472            Err(err) => Err(err.into()),
473        })))
474    }
475}