Skip to main content

dynoxide/actions/
list_streams.rs

1use crate::errors::Result;
2use crate::storage_backend::StorageBackend;
3use crate::streams;
4use serde::{Deserialize, Serialize};
5
6#[derive(Debug, Default, Deserialize)]
7pub struct ListStreamsRequest {
8    #[serde(rename = "TableName", default)]
9    pub table_name: Option<String>,
10    #[serde(rename = "ExclusiveStartStreamArn", default)]
11    pub exclusive_start_stream_arn: Option<String>,
12    #[serde(rename = "Limit", default)]
13    pub limit: Option<usize>,
14}
15
16#[derive(Debug, Default, Serialize)]
17pub struct ListStreamsResponse {
18    #[serde(rename = "Streams")]
19    pub streams: Vec<StreamSummary>,
20    #[serde(
21        rename = "LastEvaluatedStreamArn",
22        skip_serializing_if = "Option::is_none"
23    )]
24    pub last_evaluated_stream_arn: Option<String>,
25}
26
27#[derive(Debug, Default, Serialize)]
28pub struct StreamSummary {
29    #[serde(rename = "StreamArn")]
30    pub stream_arn: String,
31    #[serde(rename = "TableName")]
32    pub table_name: String,
33    #[serde(rename = "StreamLabel")]
34    pub stream_label: String,
35}
36
37pub async fn execute<S: StorageBackend>(
38    storage: &S,
39    request: ListStreamsRequest,
40) -> Result<ListStreamsResponse> {
41    let tables = storage.list_stream_enabled_tables().await?;
42
43    let mut summaries: Vec<StreamSummary> = tables
44        .into_iter()
45        .filter(|meta| {
46            if let Some(ref filter_table) = request.table_name {
47                &meta.table_name == filter_table
48            } else {
49                true
50            }
51        })
52        .map(|meta| {
53            let label = meta.stream_label.unwrap_or_default();
54            StreamSummary {
55                stream_arn: streams::stream_arn(&meta.table_name, &label),
56                table_name: meta.table_name,
57                stream_label: label,
58            }
59        })
60        .collect();
61
62    // Apply ExclusiveStartStreamArn pagination
63    if let Some(ref start_arn) = request.exclusive_start_stream_arn {
64        if let Some(pos) = summaries.iter().position(|s| &s.stream_arn == start_arn) {
65            summaries = summaries.split_off(pos + 1);
66        }
67    }
68
69    // Apply limit
70    let last_arn = if let Some(limit) = request.limit {
71        if summaries.len() > limit {
72            summaries.truncate(limit);
73            summaries.last().map(|s| s.stream_arn.clone())
74        } else {
75            None
76        }
77    } else {
78        None
79    };
80
81    Ok(ListStreamsResponse {
82        streams: summaries,
83        last_evaluated_stream_arn: last_arn,
84    })
85}