Skip to main content

s2_sdk/
ops.rs

1use futures::StreamExt;
2
3#[cfg(feature = "_hidden")]
4use crate::client::Connect;
5#[cfg(feature = "_hidden")]
6use crate::types::{EnsureBasinInput, EnsureStreamInput, ProvisionResult};
7use crate::{
8    api::{AccountClient, BaseClient, BasinClient},
9    producer::{Producer, ProducerConfig},
10    session::{self, AppendSession, AppendSessionConfig},
11    types::{
12        AccessTokenId, AccessTokenInfo, AppendAck, AppendInput, BasinConfig, BasinInfo, BasinName,
13        CreateBasinInput, CreateStreamInput, DeleteBasinInput, DeleteStreamInput, EncryptionKey,
14        GetAccountMetricsInput, GetBasinMetricsInput, GetStreamMetricsInput, IssueAccessTokenInput,
15        ListAccessTokensInput, ListAllAccessTokensInput, ListAllBasinsInput, ListAllStreamsInput,
16        ListBasinsInput, ListStreamsInput, Metric, Page, ReadBatch, ReadInput,
17        ReconfigureBasinInput, ReconfigureStreamInput, S2Config, S2Error, StreamConfig, StreamInfo,
18        StreamName, StreamPosition, Streaming,
19    },
20};
21
22#[derive(Debug, Clone)]
23/// An S2 account.
24pub struct S2 {
25    client: AccountClient,
26}
27
28impl S2 {
29    /// Create a new [`S2`].
30    pub fn new(config: S2Config) -> Result<Self, S2Error> {
31        let base_client = BaseClient::init(&config)?;
32        Ok(Self {
33            client: AccountClient::init(config, base_client),
34        })
35    }
36
37    #[doc(hidden)]
38    #[cfg(feature = "_hidden")]
39    pub fn new_with_connector<C>(config: S2Config, connector: C) -> Result<Self, S2Error>
40    where
41        C: Connect + Clone + Send + Sync + 'static,
42    {
43        let base_client = BaseClient::init_with_connector(&config, connector)?;
44        Ok(Self {
45            client: AccountClient::init(config, base_client),
46        })
47    }
48
49    /// Get an [`S2Basin`].
50    pub fn basin(&self, name: BasinName) -> S2Basin {
51        S2Basin {
52            client: self.client.basin_client(name),
53        }
54    }
55
56    /// List a page of basins.
57    ///
58    /// See [`list_all_basins`](crate::S2::list_all_basins) for automatic pagination.
59    pub async fn list_basins(&self, input: ListBasinsInput) -> Result<Page<BasinInfo>, S2Error> {
60        let response = self.client.list_basins(input.into()).await?;
61        Ok(Page::new(
62            response
63                .basins
64                .into_iter()
65                .map(TryInto::try_into)
66                .collect::<Result<Vec<_>, _>>()?,
67            response.has_more,
68        ))
69    }
70
71    /// List all basins, paginating automatically.
72    pub fn list_all_basins(&self, input: ListAllBasinsInput) -> Streaming<BasinInfo> {
73        let s2 = self.clone();
74        let prefix = input.prefix;
75        let start_after = input.start_after;
76        let include_deleted = input.include_deleted;
77        let mut input = ListBasinsInput::new()
78            .with_prefix(prefix)
79            .with_start_after(start_after);
80        Box::pin(async_stream::try_stream! {
81            loop {
82                let page = s2.list_basins(input.clone()).await?;
83                let start_after = page.values.last().map(|info| info.name.clone().into());
84
85                for info in page.values {
86                    if !include_deleted && info.deleted_at.is_some() {
87                        continue;
88                    }
89                    yield info;
90                }
91
92                if page.has_more && let Some(start_after) = start_after {
93                    input = input.with_start_after(start_after);
94                } else {
95                    break;
96                }
97            }
98        })
99    }
100
101    /// Create a basin.
102    pub async fn create_basin(&self, input: CreateBasinInput) -> Result<BasinInfo, S2Error> {
103        let (request, idempotency_token) = input.into();
104        let info = self.client.create_basin(request, idempotency_token).await?;
105        Ok(info.try_into()?)
106    }
107
108    /// Ensure a basin.
109    ///
110    /// Creates the basin if it doesn't exist, or ensures its config exactly matches the
111    /// provided configuration after defaults are applied. Uses HTTP PUT semantics — always
112    /// idempotent.
113    ///
114    /// Returns [`ProvisionResult::Created`] with the basin info if the basin was newly
115    /// created, [`ProvisionResult::Updated`] if its config changed, or
116    /// [`ProvisionResult::Noop`] if no write was needed.
117    #[doc(hidden)]
118    #[cfg(feature = "_hidden")]
119    pub async fn ensure_basin(
120        &self,
121        input: EnsureBasinInput,
122    ) -> Result<ProvisionResult<BasinInfo>, S2Error> {
123        let (name, request) = input.into();
124        Ok(self
125            .client
126            .ensure_basin(name, request)
127            .await?
128            .try_map(BasinInfo::try_from)?)
129    }
130
131    /// Get basin configuration.
132    pub async fn get_basin_config(&self, name: BasinName) -> Result<BasinConfig, S2Error> {
133        let config = self.client.get_basin_config(name).await?;
134        Ok(config.into())
135    }
136
137    /// Delete a basin.
138    pub async fn delete_basin(&self, input: DeleteBasinInput) -> Result<(), S2Error> {
139        Ok(self
140            .client
141            .delete_basin(input.name, input.ignore_not_found)
142            .await?)
143    }
144
145    /// Reconfigure a basin.
146    pub async fn reconfigure_basin(
147        &self,
148        input: ReconfigureBasinInput,
149    ) -> Result<BasinConfig, S2Error> {
150        let config = self
151            .client
152            .reconfigure_basin(input.name, input.config.into())
153            .await?;
154        Ok(config.into())
155    }
156
157    /// List a page of access tokens.
158    ///
159    /// See [`list_all_access_tokens`](crate::S2::list_all_access_tokens) for automatic pagination.
160    pub async fn list_access_tokens(
161        &self,
162        input: ListAccessTokensInput,
163    ) -> Result<Page<AccessTokenInfo>, S2Error> {
164        let response = self.client.list_access_tokens(input.into()).await?;
165        Ok(Page::new(
166            response
167                .access_tokens
168                .into_iter()
169                .map(TryInto::try_into)
170                .collect::<Result<Vec<_>, _>>()?,
171            response.has_more,
172        ))
173    }
174
175    /// List all access tokens, paginating automatically.
176    pub fn list_all_access_tokens(
177        &self,
178        input: ListAllAccessTokensInput,
179    ) -> Streaming<AccessTokenInfo> {
180        let s2 = self.clone();
181        let prefix = input.prefix;
182        let start_after = input.start_after;
183        let mut input = ListAccessTokensInput::new()
184            .with_prefix(prefix)
185            .with_start_after(start_after);
186        Box::pin(async_stream::try_stream! {
187            loop {
188                let page = s2.list_access_tokens(input.clone()).await?;
189
190                let start_after = page.values.last().map(|info| info.id.clone().into());
191                for info in page.values {
192                    yield info;
193                }
194
195                if page.has_more && let Some(start_after) = start_after {
196                    input = input.with_start_after(start_after);
197                } else {
198                    break;
199                }
200            }
201        })
202    }
203
204    /// Issue an access token.
205    pub async fn issue_access_token(
206        &self,
207        input: IssueAccessTokenInput,
208    ) -> Result<String, S2Error> {
209        let response = self.client.issue_access_token(input.into()).await?;
210        Ok(response.access_token)
211    }
212
213    /// Revoke an access token.
214    pub async fn revoke_access_token(&self, id: AccessTokenId) -> Result<(), S2Error> {
215        Ok(self.client.revoke_access_token(id).await?)
216    }
217
218    /// Get account metrics.
219    pub async fn get_account_metrics(
220        &self,
221        input: GetAccountMetricsInput,
222    ) -> Result<Vec<Metric>, S2Error> {
223        let response = self.client.get_account_metrics(input.into()).await?;
224        Ok(response.values.into_iter().map(Into::into).collect())
225    }
226
227    /// Get basin metrics.
228    pub async fn get_basin_metrics(
229        &self,
230        input: GetBasinMetricsInput,
231    ) -> Result<Vec<Metric>, S2Error> {
232        let (name, request) = input.into();
233        let response = self.client.get_basin_metrics(name, request).await?;
234        Ok(response.values.into_iter().map(Into::into).collect())
235    }
236
237    /// Get stream metrics.
238    pub async fn get_stream_metrics(
239        &self,
240        input: GetStreamMetricsInput,
241    ) -> Result<Vec<Metric>, S2Error> {
242        let (basin_name, stream_name, request) = input.into();
243        let response = self
244            .client
245            .get_stream_metrics(basin_name, stream_name, request)
246            .await?;
247        Ok(response.values.into_iter().map(Into::into).collect())
248    }
249}
250
251#[derive(Debug, Clone)]
252/// A basin in an S2 account.
253///
254/// See [`S2::basin`].
255pub struct S2Basin {
256    client: BasinClient,
257}
258
259impl S2Basin {
260    /// Get an [`S2Stream`].
261    pub fn stream(&self, name: StreamName) -> S2Stream {
262        S2Stream {
263            client: self.client.clone(),
264            name,
265            encryption: None,
266        }
267    }
268
269    /// List a page of streams.
270    ///
271    /// See [`list_all_streams`](crate::S2Basin::list_all_streams) for automatic pagination.
272    pub async fn list_streams(&self, input: ListStreamsInput) -> Result<Page<StreamInfo>, S2Error> {
273        let response = self.client.list_streams(input.into()).await?;
274        Ok(Page::new(
275            response
276                .streams
277                .into_iter()
278                .map(TryInto::try_into)
279                .collect::<Result<Vec<_>, _>>()?,
280            response.has_more,
281        ))
282    }
283
284    /// List all streams, paginating automatically.
285    pub fn list_all_streams(&self, input: ListAllStreamsInput) -> Streaming<StreamInfo> {
286        let basin = self.clone();
287        let prefix = input.prefix;
288        let start_after = input.start_after;
289        let include_deleted = input.include_deleted;
290        let mut input = ListStreamsInput::new()
291            .with_prefix(prefix)
292            .with_start_after(start_after);
293        Box::pin(async_stream::try_stream! {
294            loop {
295                let page = basin.list_streams(input.clone()).await?;
296                let start_after = page.values.last().map(|info| info.name.clone().into());
297
298                for info in page.values {
299                    if !include_deleted && info.deleted_at.is_some() {
300                        continue;
301                    }
302                    yield info;
303                }
304
305                if page.has_more && let Some(start_after) = start_after {
306                    input = input.with_start_after(start_after);
307                } else {
308                    break;
309                }
310            }
311        })
312    }
313
314    /// Create a stream.
315    pub async fn create_stream(&self, input: CreateStreamInput) -> Result<StreamInfo, S2Error> {
316        let (request, idempotency_token) = input.into();
317        let info = self
318            .client
319            .create_stream(request, idempotency_token)
320            .await?;
321        Ok(info.try_into()?)
322    }
323
324    /// Ensure a stream.
325    ///
326    /// Creates the stream if it doesn't exist, or ensures its config exactly matches the provided
327    /// configuration after basin defaults and global defaults are applied. Uses HTTP PUT semantics
328    /// and is always idempotent.
329    ///
330    /// Returns [`ProvisionResult::Created`] with the stream info if the stream was newly
331    /// created, [`ProvisionResult::Updated`] if its config changed, or
332    /// [`ProvisionResult::Noop`] if no write was needed.
333    #[doc(hidden)]
334    #[cfg(feature = "_hidden")]
335    pub async fn ensure_stream(
336        &self,
337        input: EnsureStreamInput,
338    ) -> Result<ProvisionResult<StreamInfo>, S2Error> {
339        let (name, config) = input.into();
340        Ok(self
341            .client
342            .ensure_stream(name, config)
343            .await?
344            .try_map(StreamInfo::try_from)?)
345    }
346
347    /// Get stream configuration.
348    pub async fn get_stream_config(&self, name: StreamName) -> Result<StreamConfig, S2Error> {
349        let config = self.client.get_stream_config(name).await?;
350        Ok(config.into())
351    }
352
353    /// Delete a stream.
354    pub async fn delete_stream(&self, input: DeleteStreamInput) -> Result<(), S2Error> {
355        Ok(self
356            .client
357            .delete_stream(input.name, input.ignore_not_found)
358            .await?)
359    }
360
361    /// Reconfigure a stream.
362    pub async fn reconfigure_stream(
363        &self,
364        input: ReconfigureStreamInput,
365    ) -> Result<StreamConfig, S2Error> {
366        let config = self
367            .client
368            .reconfigure_stream(input.name, input.config.into())
369            .await?;
370        Ok(config.into())
371    }
372}
373
374#[derive(Debug, Clone)]
375/// A stream in an S2 basin.
376///
377/// See [`S2Basin::stream`].
378pub struct S2Stream {
379    client: BasinClient,
380    name: StreamName,
381    encryption: Option<EncryptionKey>,
382}
383
384impl S2Stream {
385    /// Set the encryption key for this stream handle.
386    pub fn with_encryption_key(self, encryption: EncryptionKey) -> Self {
387        Self {
388            encryption: Some(encryption),
389            ..self
390        }
391    }
392
393    /// Check tail position.
394    pub async fn check_tail(&self) -> Result<StreamPosition, S2Error> {
395        let response = self.client.check_tail(&self.name).await?;
396        Ok(response.tail.into())
397    }
398
399    /// Append records.
400    pub async fn append(&self, input: AppendInput) -> Result<AppendAck, S2Error> {
401        let ack = self
402            .client
403            .append(
404                &self.name,
405                input.into(),
406                self.encryption.as_ref(),
407                self.client.config.retry.append_retry_policy,
408            )
409            .await?;
410        Ok(ack.into())
411    }
412
413    /// Read records.
414    pub async fn read(&self, input: ReadInput) -> Result<ReadBatch, S2Error> {
415        let batch = self
416            .client
417            .read(
418                &self.name,
419                input.start.into(),
420                input.stop.into(),
421                self.encryption.as_ref(),
422            )
423            .await?;
424        let mut batch = ReadBatch::from_api(batch);
425        if input.ignore_command_records {
426            batch.records.retain(|r| !r.is_command_record());
427        }
428        Ok(batch)
429    }
430
431    /// Create an append session for submitting [`AppendInput`]s.
432    pub fn append_session(&self, config: AppendSessionConfig) -> AppendSession {
433        AppendSession::new(
434            self.client.clone(),
435            self.name.clone(),
436            self.encryption.clone(),
437            config,
438        )
439    }
440
441    /// Create a producer for submitting individual [`AppendRecord`](crate::types::AppendRecord)s.
442    pub fn producer(&self, config: ProducerConfig) -> Producer {
443        Producer::new(
444            self.client.clone(),
445            self.name.clone(),
446            self.encryption.clone(),
447            config,
448        )
449    }
450
451    /// Create a read session.
452    pub async fn read_session(&self, input: ReadInput) -> Result<Streaming<ReadBatch>, S2Error> {
453        let batches = session::read_session(
454            self.client.clone(),
455            self.name.clone(),
456            self.encryption.clone(),
457            input.start.into(),
458            input.stop.into(),
459            input.ignore_command_records,
460        )
461        .await?;
462        Ok(Box::pin(batches.map(|res| match res {
463            Ok(batch) => Ok(batch),
464            Err(err) => Err(err.into()),
465        })))
466    }
467}