1use futures::StreamExt;
2
3use crate::{
4 api::{AccountClient, BasinClient},
5 producer::{Producer, ProducerConfig},
6 session::{self, AppendSession, AppendSessionConfig},
7 types::{
8 AccessTokenId, AccessTokenInfo, AppendAck, AppendInput, BasinConfig, BasinInfo, BasinName,
9 BasinState, CreateBasinInput, CreateStreamInput, DeleteBasinInput, DeleteStreamInput,
10 GetAccountMetricsInput, GetBasinMetricsInput, GetStreamMetricsInput, IssueAccessTokenInput,
11 ListAccessTokensInput, ListAllAccessTokensInput, ListAllBasinsInput, ListAllStreamsInput,
12 ListBasinsInput, ListStreamsInput, Metric, Page, ReadBatch, ReadInput,
13 ReconfigureBasinInput, ReconfigureStreamInput, S2Config, S2Error, StreamConfig, StreamInfo,
14 StreamName, StreamPosition, Streaming,
15 },
16};
17
18#[derive(Debug, Clone)]
19pub struct S2 {
21 client: AccountClient,
22}
23
24impl S2 {
25 pub fn new(config: S2Config) -> Result<Self, S2Error> {
27 Ok(Self {
28 client: AccountClient::init(config)?,
29 })
30 }
31
32 pub fn basin(&self, name: BasinName) -> S2Basin {
34 S2Basin {
35 client: self.client.basin_client(name),
36 }
37 }
38
39 pub async fn list_basins(&self, input: ListBasinsInput) -> Result<Page<BasinInfo>, S2Error> {
43 let response = self.client.list_basins(input.into()).await?;
44 Ok(Page::new(
45 response
46 .basins
47 .into_iter()
48 .map(Into::into)
49 .collect::<Vec<_>>(),
50 response.has_more,
51 ))
52 }
53
54 pub fn list_all_basins(&self, input: ListAllBasinsInput) -> Streaming<BasinInfo> {
56 let s2 = self.clone();
57 let prefix = input.prefix;
58 let start_after = input.start_after;
59 let include_deleted = input.include_deleted;
60 let mut input = ListBasinsInput::new()
61 .with_prefix(prefix)
62 .with_start_after(start_after);
63 Box::pin(async_stream::try_stream! {
64 loop {
65 let page = s2.list_basins(input.clone()).await?;
66
67 let start_after = page.values.last().map(|info| info.name.clone().into());
68 for info in page.values {
69 if !include_deleted && info.state == BasinState::Deleting {
70 continue;
71 }
72 yield info;
73 }
74
75 if page.has_more && let Some(start_after) = start_after {
76 input = input.with_start_after(start_after);
77 } else {
78 break;
79 }
80 }
81 })
82 }
83
84 pub async fn create_basin(&self, input: CreateBasinInput) -> Result<BasinInfo, S2Error> {
86 let (request, idempotency_token) = input.into();
87 let info = self.client.create_basin(request, idempotency_token).await?;
88 Ok(info.into())
89 }
90
91 pub async fn get_basin_config(&self, name: BasinName) -> Result<BasinConfig, S2Error> {
93 let config = self.client.get_basin_config(name).await?;
94 Ok(config.into())
95 }
96
97 pub async fn delete_basin(&self, input: DeleteBasinInput) -> Result<(), S2Error> {
99 Ok(self
100 .client
101 .delete_basin(input.name, input.ignore_not_found)
102 .await?)
103 }
104
105 pub async fn reconfigure_basin(
107 &self,
108 input: ReconfigureBasinInput,
109 ) -> Result<BasinConfig, S2Error> {
110 let config = self
111 .client
112 .reconfigure_basin(input.name, input.config.into())
113 .await?;
114 Ok(config.into())
115 }
116
117 pub async fn list_access_tokens(
121 &self,
122 input: ListAccessTokensInput,
123 ) -> Result<Page<AccessTokenInfo>, S2Error> {
124 let response = self.client.list_access_tokens(input.into()).await?;
125 Ok(Page::new(
126 response
127 .access_tokens
128 .into_iter()
129 .map(TryInto::try_into)
130 .collect::<Result<Vec<_>, _>>()?,
131 response.has_more,
132 ))
133 }
134
135 pub fn list_all_access_tokens(
137 &self,
138 input: ListAllAccessTokensInput,
139 ) -> Streaming<AccessTokenInfo> {
140 let s2 = self.clone();
141 let prefix = input.prefix;
142 let start_after = input.start_after;
143 let mut input = ListAccessTokensInput::new()
144 .with_prefix(prefix)
145 .with_start_after(start_after);
146 Box::pin(async_stream::try_stream! {
147 loop {
148 let page = s2.list_access_tokens(input.clone()).await?;
149
150 let start_after = page.values.last().map(|info| info.id.clone().into());
151 for info in page.values {
152 yield info;
153 }
154
155 if page.has_more && let Some(start_after) = start_after {
156 input = input.with_start_after(start_after);
157 } else {
158 break;
159 }
160 }
161 })
162 }
163
164 pub async fn issue_access_token(
166 &self,
167 input: IssueAccessTokenInput,
168 ) -> Result<String, S2Error> {
169 let response = self.client.issue_access_token(input.into()).await?;
170 Ok(response.access_token)
171 }
172
173 pub async fn revoke_access_token(&self, id: AccessTokenId) -> Result<(), S2Error> {
175 Ok(self.client.revoke_access_token(id).await?)
176 }
177
178 pub async fn get_account_metrics(
180 &self,
181 input: GetAccountMetricsInput,
182 ) -> Result<Vec<Metric>, S2Error> {
183 let response = self.client.get_account_metrics(input.into()).await?;
184 Ok(response.values.into_iter().map(Into::into).collect())
185 }
186
187 pub async fn get_basin_metrics(
189 &self,
190 input: GetBasinMetricsInput,
191 ) -> Result<Vec<Metric>, S2Error> {
192 let (name, request) = input.into();
193 let response = self.client.get_basin_metrics(name, request).await?;
194 Ok(response.values.into_iter().map(Into::into).collect())
195 }
196
197 pub async fn get_stream_metrics(
199 &self,
200 input: GetStreamMetricsInput,
201 ) -> Result<Vec<Metric>, S2Error> {
202 let (basin_name, stream_name, request) = input.into();
203 let response = self
204 .client
205 .get_stream_metrics(basin_name, stream_name, request)
206 .await?;
207 Ok(response.values.into_iter().map(Into::into).collect())
208 }
209}
210
211#[derive(Debug, Clone)]
212pub struct S2Basin {
216 client: BasinClient,
217}
218
219impl S2Basin {
220 pub fn stream(&self, name: StreamName) -> S2Stream {
222 S2Stream {
223 client: self.client.clone(),
224 name,
225 }
226 }
227
228 pub async fn list_streams(&self, input: ListStreamsInput) -> Result<Page<StreamInfo>, S2Error> {
232 let response = self.client.list_streams(input.into()).await?;
233 Ok(Page::new(
234 response
235 .streams
236 .into_iter()
237 .map(Into::into)
238 .collect::<Vec<_>>(),
239 response.has_more,
240 ))
241 }
242
243 pub fn list_all_streams(&self, input: ListAllStreamsInput) -> Streaming<StreamInfo> {
245 let basin = self.clone();
246 let prefix = input.prefix;
247 let start_after = input.start_after;
248 let include_deleted = input.include_deleted;
249 let mut input = ListStreamsInput::new()
250 .with_prefix(prefix)
251 .with_start_after(start_after);
252 Box::pin(async_stream::try_stream! {
253 loop {
254 let page = basin.list_streams(input.clone()).await?;
255
256 let start_after = page.values.last().map(|info| info.name.clone().into());
257 for info in page.values {
258 if !include_deleted && info.deleted_at.is_some() {
259 continue;
260 }
261 yield info;
262 }
263
264 if page.has_more && let Some(start_after) = start_after {
265 input = input.with_start_after(start_after);
266 } else {
267 break;
268 }
269 }
270 })
271 }
272
273 pub async fn create_stream(&self, input: CreateStreamInput) -> Result<StreamInfo, S2Error> {
275 let (request, idempotency_token) = input.into();
276 let info = self
277 .client
278 .create_stream(request, idempotency_token)
279 .await?;
280 Ok(info.into())
281 }
282
283 pub async fn get_stream_config(&self, name: StreamName) -> Result<StreamConfig, S2Error> {
285 let config = self.client.get_stream_config(name).await?;
286 Ok(config.into())
287 }
288
289 pub async fn delete_stream(&self, input: DeleteStreamInput) -> Result<(), S2Error> {
291 Ok(self
292 .client
293 .delete_stream(input.name, input.ignore_not_found)
294 .await?)
295 }
296
297 pub async fn reconfigure_stream(
299 &self,
300 input: ReconfigureStreamInput,
301 ) -> Result<StreamConfig, S2Error> {
302 let config = self
303 .client
304 .reconfigure_stream(input.name, input.config.into())
305 .await?;
306 Ok(config.into())
307 }
308}
309
310#[derive(Debug, Clone)]
311pub struct S2Stream {
315 client: BasinClient,
316 name: StreamName,
317}
318
319impl S2Stream {
320 pub async fn check_tail(&self) -> Result<StreamPosition, S2Error> {
322 let response = self.client.check_tail(&self.name).await?;
323 Ok(response.tail.into())
324 }
325
326 pub async fn append(&self, input: AppendInput) -> Result<AppendAck, S2Error> {
328 let retry_enabled = self
329 .client
330 .config
331 .retry
332 .append_retry_policy
333 .is_compliant(&input);
334 let ack = self
335 .client
336 .append(&self.name, input.into(), retry_enabled)
337 .await?;
338 Ok(ack.into())
339 }
340
341 pub async fn read(&self, input: ReadInput) -> Result<ReadBatch, S2Error> {
343 let batch = self
344 .client
345 .read(&self.name, input.start.into(), input.stop.into())
346 .await?;
347 Ok(ReadBatch::from_api(batch, input.ignore_command_records))
348 }
349
350 pub fn append_session(&self, config: AppendSessionConfig) -> AppendSession {
352 AppendSession::new(self.client.clone(), self.name.clone(), config)
353 }
354
355 pub fn producer(&self, config: ProducerConfig) -> Producer {
357 Producer::new(self.client.clone(), self.name.clone(), config)
358 }
359
360 pub async fn read_session(&self, input: ReadInput) -> Result<Streaming<ReadBatch>, S2Error> {
362 let batches = session::read_session(
363 self.client.clone(),
364 self.name.clone(),
365 input.start.into(),
366 input.stop.into(),
367 input.ignore_command_records,
368 )
369 .await?;
370 Ok(Box::pin(batches.map(|res| match res {
371 Ok(batch) => Ok(batch),
372 Err(err) => Err(err.into()),
373 })))
374 }
375}