dynoxide/actions/
list_streams.rs1use crate::errors::Result;
2use crate::storage::Storage;
3use crate::streams;
4use serde::{Deserialize, Serialize};
5
6#[derive(Debug, Default, Deserialize)]
7pub struct ListStreamsRequest {
8 #[serde(rename = "TableName", default)]
9 pub table_name: Option<String>,
10 #[serde(rename = "ExclusiveStartStreamArn", default)]
11 pub exclusive_start_stream_arn: Option<String>,
12 #[serde(rename = "Limit", default)]
13 pub limit: Option<usize>,
14}
15
16#[derive(Debug, Default, Serialize)]
17pub struct ListStreamsResponse {
18 #[serde(rename = "Streams")]
19 pub streams: Vec<StreamSummary>,
20 #[serde(
21 rename = "LastEvaluatedStreamArn",
22 skip_serializing_if = "Option::is_none"
23 )]
24 pub last_evaluated_stream_arn: Option<String>,
25}
26
27#[derive(Debug, Default, Serialize)]
28pub struct StreamSummary {
29 #[serde(rename = "StreamArn")]
30 pub stream_arn: String,
31 #[serde(rename = "TableName")]
32 pub table_name: String,
33 #[serde(rename = "StreamLabel")]
34 pub stream_label: String,
35}
36
37pub fn execute(storage: &Storage, request: ListStreamsRequest) -> Result<ListStreamsResponse> {
38 let tables = storage.list_stream_enabled_tables()?;
39
40 let mut summaries: Vec<StreamSummary> = tables
41 .into_iter()
42 .filter(|meta| {
43 if let Some(ref filter_table) = request.table_name {
44 &meta.table_name == filter_table
45 } else {
46 true
47 }
48 })
49 .map(|meta| {
50 let label = meta.stream_label.unwrap_or_default();
51 StreamSummary {
52 stream_arn: streams::stream_arn(&meta.table_name, &label),
53 table_name: meta.table_name,
54 stream_label: label,
55 }
56 })
57 .collect();
58
59 if let Some(ref start_arn) = request.exclusive_start_stream_arn {
61 if let Some(pos) = summaries.iter().position(|s| &s.stream_arn == start_arn) {
62 summaries = summaries.split_off(pos + 1);
63 }
64 }
65
66 let last_arn = if let Some(limit) = request.limit {
68 if summaries.len() > limit {
69 summaries.truncate(limit);
70 summaries.last().map(|s| s.stream_arn.clone())
71 } else {
72 None
73 }
74 } else {
75 None
76 };
77
78 Ok(ListStreamsResponse {
79 streams: summaries,
80 last_evaluated_stream_arn: last_arn,
81 })
82}