dynoxide/actions/
list_streams.rs1use crate::errors::Result;
2use crate::storage_backend::StorageBackend;
3use crate::streams;
4use serde::{Deserialize, Serialize};
5
6#[derive(Debug, Default, Deserialize)]
7pub struct ListStreamsRequest {
8 #[serde(rename = "TableName", default)]
9 pub table_name: Option<String>,
10 #[serde(rename = "ExclusiveStartStreamArn", default)]
11 pub exclusive_start_stream_arn: Option<String>,
12 #[serde(rename = "Limit", default)]
13 pub limit: Option<usize>,
14}
15
16#[derive(Debug, Default, Serialize)]
17pub struct ListStreamsResponse {
18 #[serde(rename = "Streams")]
19 pub streams: Vec<StreamSummary>,
20 #[serde(
21 rename = "LastEvaluatedStreamArn",
22 skip_serializing_if = "Option::is_none"
23 )]
24 pub last_evaluated_stream_arn: Option<String>,
25}
26
27#[derive(Debug, Default, Serialize)]
28pub struct StreamSummary {
29 #[serde(rename = "StreamArn")]
30 pub stream_arn: String,
31 #[serde(rename = "TableName")]
32 pub table_name: String,
33 #[serde(rename = "StreamLabel")]
34 pub stream_label: String,
35}
36
37pub async fn execute<S: StorageBackend>(
38 storage: &S,
39 request: ListStreamsRequest,
40) -> Result<ListStreamsResponse> {
41 let tables = storage.list_stream_enabled_tables().await?;
42
43 let mut summaries: Vec<StreamSummary> = tables
44 .into_iter()
45 .filter(|meta| {
46 if let Some(ref filter_table) = request.table_name {
47 &meta.table_name == filter_table
48 } else {
49 true
50 }
51 })
52 .map(|meta| {
53 let label = meta.stream_label.unwrap_or_default();
54 StreamSummary {
55 stream_arn: streams::stream_arn(&meta.table_name, &label),
56 table_name: meta.table_name,
57 stream_label: label,
58 }
59 })
60 .collect();
61
62 if let Some(ref start_arn) = request.exclusive_start_stream_arn {
64 if let Some(pos) = summaries.iter().position(|s| &s.stream_arn == start_arn) {
65 summaries = summaries.split_off(pos + 1);
66 }
67 }
68
69 let last_arn = if let Some(limit) = request.limit {
71 if summaries.len() > limit {
72 summaries.truncate(limit);
73 summaries.last().map(|s| s.stream_arn.clone())
74 } else {
75 None
76 }
77 } else {
78 None
79 };
80
81 Ok(ListStreamsResponse {
82 streams: summaries,
83 last_evaluated_stream_arn: last_arn,
84 })
85}