1use futures::StreamExt;
2
3#[cfg(feature = "_hidden")]
4use crate::client::Connect;
5use crate::{
6 api::{AccountClient, BaseClient, BasinClient},
7 producer::{Producer, ProducerConfig},
8 session::{self, AppendSession, AppendSessionConfig},
9 types::{
10 AccessTokenId, AccessTokenInfo, AppendAck, AppendInput, BasinConfig, BasinInfo, BasinName,
11 BasinState, CreateBasinInput, CreateStreamInput, DeleteBasinInput, DeleteStreamInput,
12 GetAccountMetricsInput, GetBasinMetricsInput, GetStreamMetricsInput, IssueAccessTokenInput,
13 ListAccessTokensInput, ListAllAccessTokensInput, ListAllBasinsInput, ListAllStreamsInput,
14 ListBasinsInput, ListStreamsInput, Metric, Page, ReadBatch, ReadInput,
15 ReconfigureBasinInput, ReconfigureStreamInput, S2Config, S2Error, StreamConfig, StreamInfo,
16 StreamName, StreamPosition, Streaming,
17 },
18};
19
20#[derive(Debug, Clone)]
21pub struct S2 {
23 client: AccountClient,
24}
25
26impl S2 {
27 pub fn new(config: S2Config) -> Result<Self, S2Error> {
29 let base_client = BaseClient::init(&config)?;
30 Ok(Self {
31 client: AccountClient::init(config, base_client),
32 })
33 }
34
35 #[doc(hidden)]
36 #[cfg(feature = "_hidden")]
37 pub fn new_with_connector<C>(config: S2Config, connector: C) -> Result<Self, S2Error>
38 where
39 C: Connect + Clone + Send + Sync + 'static,
40 {
41 let base_client = BaseClient::init_with_connector(&config, connector)?;
42 Ok(Self {
43 client: AccountClient::init(config, base_client),
44 })
45 }
46
47 pub fn basin(&self, name: BasinName) -> S2Basin {
49 S2Basin {
50 client: self.client.basin_client(name),
51 }
52 }
53
54 pub async fn list_basins(&self, input: ListBasinsInput) -> Result<Page<BasinInfo>, S2Error> {
58 let response = self.client.list_basins(input.into()).await?;
59 Ok(Page::new(
60 response
61 .basins
62 .into_iter()
63 .map(Into::into)
64 .collect::<Vec<_>>(),
65 response.has_more,
66 ))
67 }
68
69 pub fn list_all_basins(&self, input: ListAllBasinsInput) -> Streaming<BasinInfo> {
71 let s2 = self.clone();
72 let prefix = input.prefix;
73 let start_after = input.start_after;
74 let include_deleted = input.include_deleted;
75 let mut input = ListBasinsInput::new()
76 .with_prefix(prefix)
77 .with_start_after(start_after);
78 Box::pin(async_stream::try_stream! {
79 loop {
80 let page = s2.list_basins(input.clone()).await?;
81
82 let start_after = page.values.last().map(|info| info.name.clone().into());
83 for info in page.values {
84 if !include_deleted && info.state == BasinState::Deleting {
85 continue;
86 }
87 yield info;
88 }
89
90 if page.has_more && let Some(start_after) = start_after {
91 input = input.with_start_after(start_after);
92 } else {
93 break;
94 }
95 }
96 })
97 }
98
99 pub async fn create_basin(&self, input: CreateBasinInput) -> Result<BasinInfo, S2Error> {
101 let (request, idempotency_token) = input.into();
102 let info = self.client.create_basin(request, idempotency_token).await?;
103 Ok(info.into())
104 }
105
106 pub async fn get_basin_config(&self, name: BasinName) -> Result<BasinConfig, S2Error> {
108 let config = self.client.get_basin_config(name).await?;
109 Ok(config.into())
110 }
111
112 pub async fn delete_basin(&self, input: DeleteBasinInput) -> Result<(), S2Error> {
114 Ok(self
115 .client
116 .delete_basin(input.name, input.ignore_not_found)
117 .await?)
118 }
119
120 pub async fn reconfigure_basin(
122 &self,
123 input: ReconfigureBasinInput,
124 ) -> Result<BasinConfig, S2Error> {
125 let config = self
126 .client
127 .reconfigure_basin(input.name, input.config.into())
128 .await?;
129 Ok(config.into())
130 }
131
132 pub async fn list_access_tokens(
136 &self,
137 input: ListAccessTokensInput,
138 ) -> Result<Page<AccessTokenInfo>, S2Error> {
139 let response = self.client.list_access_tokens(input.into()).await?;
140 Ok(Page::new(
141 response
142 .access_tokens
143 .into_iter()
144 .map(TryInto::try_into)
145 .collect::<Result<Vec<_>, _>>()?,
146 response.has_more,
147 ))
148 }
149
150 pub fn list_all_access_tokens(
152 &self,
153 input: ListAllAccessTokensInput,
154 ) -> Streaming<AccessTokenInfo> {
155 let s2 = self.clone();
156 let prefix = input.prefix;
157 let start_after = input.start_after;
158 let mut input = ListAccessTokensInput::new()
159 .with_prefix(prefix)
160 .with_start_after(start_after);
161 Box::pin(async_stream::try_stream! {
162 loop {
163 let page = s2.list_access_tokens(input.clone()).await?;
164
165 let start_after = page.values.last().map(|info| info.id.clone().into());
166 for info in page.values {
167 yield info;
168 }
169
170 if page.has_more && let Some(start_after) = start_after {
171 input = input.with_start_after(start_after);
172 } else {
173 break;
174 }
175 }
176 })
177 }
178
179 pub async fn issue_access_token(
181 &self,
182 input: IssueAccessTokenInput,
183 ) -> Result<String, S2Error> {
184 let response = self.client.issue_access_token(input.into()).await?;
185 Ok(response.access_token)
186 }
187
188 pub async fn revoke_access_token(&self, id: AccessTokenId) -> Result<(), S2Error> {
190 Ok(self.client.revoke_access_token(id).await?)
191 }
192
193 pub async fn get_account_metrics(
195 &self,
196 input: GetAccountMetricsInput,
197 ) -> Result<Vec<Metric>, S2Error> {
198 let response = self.client.get_account_metrics(input.into()).await?;
199 Ok(response.values.into_iter().map(Into::into).collect())
200 }
201
202 pub async fn get_basin_metrics(
204 &self,
205 input: GetBasinMetricsInput,
206 ) -> Result<Vec<Metric>, S2Error> {
207 let (name, request) = input.into();
208 let response = self.client.get_basin_metrics(name, request).await?;
209 Ok(response.values.into_iter().map(Into::into).collect())
210 }
211
212 pub async fn get_stream_metrics(
214 &self,
215 input: GetStreamMetricsInput,
216 ) -> Result<Vec<Metric>, S2Error> {
217 let (basin_name, stream_name, request) = input.into();
218 let response = self
219 .client
220 .get_stream_metrics(basin_name, stream_name, request)
221 .await?;
222 Ok(response.values.into_iter().map(Into::into).collect())
223 }
224}
225
226#[derive(Debug, Clone)]
227pub struct S2Basin {
231 client: BasinClient,
232}
233
234impl S2Basin {
235 pub fn stream(&self, name: StreamName) -> S2Stream {
237 S2Stream {
238 client: self.client.clone(),
239 name,
240 }
241 }
242
243 pub async fn list_streams(&self, input: ListStreamsInput) -> Result<Page<StreamInfo>, S2Error> {
247 let response = self.client.list_streams(input.into()).await?;
248 Ok(Page::new(
249 response
250 .streams
251 .into_iter()
252 .map(TryInto::try_into)
253 .collect::<Result<Vec<_>, _>>()?,
254 response.has_more,
255 ))
256 }
257
258 pub fn list_all_streams(&self, input: ListAllStreamsInput) -> Streaming<StreamInfo> {
260 let basin = self.clone();
261 let prefix = input.prefix;
262 let start_after = input.start_after;
263 let include_deleted = input.include_deleted;
264 let mut input = ListStreamsInput::new()
265 .with_prefix(prefix)
266 .with_start_after(start_after);
267 Box::pin(async_stream::try_stream! {
268 loop {
269 let page = basin.list_streams(input.clone()).await?;
270
271 let start_after = page.values.last().map(|info| info.name.clone().into());
272 for info in page.values {
273 if !include_deleted && info.deleted_at.is_some() {
274 continue;
275 }
276 yield info;
277 }
278
279 if page.has_more && let Some(start_after) = start_after {
280 input = input.with_start_after(start_after);
281 } else {
282 break;
283 }
284 }
285 })
286 }
287
288 pub async fn create_stream(&self, input: CreateStreamInput) -> Result<StreamInfo, S2Error> {
290 let (request, idempotency_token) = input.into();
291 let info = self
292 .client
293 .create_stream(request, idempotency_token)
294 .await?;
295 Ok(info.try_into()?)
296 }
297
298 pub async fn get_stream_config(&self, name: StreamName) -> Result<StreamConfig, S2Error> {
300 let config = self.client.get_stream_config(name).await?;
301 Ok(config.into())
302 }
303
304 pub async fn delete_stream(&self, input: DeleteStreamInput) -> Result<(), S2Error> {
306 Ok(self
307 .client
308 .delete_stream(input.name, input.ignore_not_found)
309 .await?)
310 }
311
312 pub async fn reconfigure_stream(
314 &self,
315 input: ReconfigureStreamInput,
316 ) -> Result<StreamConfig, S2Error> {
317 let config = self
318 .client
319 .reconfigure_stream(input.name, input.config.into())
320 .await?;
321 Ok(config.into())
322 }
323}
324
325#[derive(Debug, Clone)]
326pub struct S2Stream {
330 client: BasinClient,
331 name: StreamName,
332}
333
334impl S2Stream {
335 pub async fn check_tail(&self) -> Result<StreamPosition, S2Error> {
337 let response = self.client.check_tail(&self.name).await?;
338 Ok(response.tail.into())
339 }
340
341 pub async fn append(&self, input: AppendInput) -> Result<AppendAck, S2Error> {
343 let retry_enabled = self
344 .client
345 .config
346 .retry
347 .append_retry_policy
348 .is_compliant(&input);
349 let ack = self
350 .client
351 .append(&self.name, input.into(), retry_enabled)
352 .await?;
353 Ok(ack.into())
354 }
355
356 pub async fn read(&self, input: ReadInput) -> Result<ReadBatch, S2Error> {
358 let batch = self
359 .client
360 .read(&self.name, input.start.into(), input.stop.into())
361 .await?;
362 Ok(ReadBatch::from_api(batch, input.ignore_command_records))
363 }
364
365 pub fn append_session(&self, config: AppendSessionConfig) -> AppendSession {
367 AppendSession::new(self.client.clone(), self.name.clone(), config)
368 }
369
370 pub fn producer(&self, config: ProducerConfig) -> Producer {
372 Producer::new(self.client.clone(), self.name.clone(), config)
373 }
374
375 pub async fn read_session(&self, input: ReadInput) -> Result<Streaming<ReadBatch>, S2Error> {
377 let batches = session::read_session(
378 self.client.clone(),
379 self.name.clone(),
380 input.start.into(),
381 input.stop.into(),
382 input.ignore_command_records,
383 )
384 .await?;
385 Ok(Box::pin(batches.map(|res| match res {
386 Ok(batch) => Ok(batch),
387 Err(err) => Err(err.into()),
388 })))
389 }
390}