dynoxide/actions/
describe_stream.rs1use crate::errors::{DynoxideError, Result};
2use crate::storage::Storage;
3use crate::streams;
4use serde::{Deserialize, Serialize};
5
6#[derive(Debug, Default, Deserialize)]
7pub struct DescribeStreamRequest {
8 #[serde(rename = "StreamArn")]
9 pub stream_arn: String,
10 #[serde(rename = "ExclusiveStartShardId", default)]
11 pub exclusive_start_shard_id: Option<String>,
12 #[serde(rename = "Limit", default)]
13 pub limit: Option<usize>,
14}
15
16#[derive(Debug, Default, Serialize)]
17pub struct DescribeStreamResponse {
18 #[serde(rename = "StreamDescription")]
19 pub stream_description: StreamDescription,
20}
21
22#[derive(Debug, Default, Serialize)]
23pub struct StreamDescription {
24 #[serde(rename = "StreamArn")]
25 pub stream_arn: String,
26 #[serde(rename = "StreamLabel")]
27 pub stream_label: String,
28 #[serde(rename = "StreamStatus")]
29 pub stream_status: String,
30 #[serde(rename = "StreamViewType")]
31 pub stream_view_type: String,
32 #[serde(rename = "TableName")]
33 pub table_name: String,
34 #[serde(rename = "Shards")]
35 pub shards: Vec<Shard>,
36 #[serde(rename = "CreationRequestDateTime")]
37 pub creation_request_date_time: f64,
38}
39
40#[derive(Debug, Default, Serialize)]
41pub struct Shard {
42 #[serde(rename = "ShardId")]
43 pub shard_id: String,
44 #[serde(rename = "SequenceNumberRange")]
45 pub sequence_number_range: SequenceNumberRange,
46}
47
48#[derive(Debug, Default, Serialize)]
49pub struct SequenceNumberRange {
50 #[serde(
51 rename = "StartingSequenceNumber",
52 skip_serializing_if = "Option::is_none"
53 )]
54 pub starting_sequence_number: Option<String>,
55 #[serde(
56 rename = "EndingSequenceNumber",
57 skip_serializing_if = "Option::is_none"
58 )]
59 pub ending_sequence_number: Option<String>,
60}
61
62pub fn execute(
63 storage: &Storage,
64 request: DescribeStreamRequest,
65) -> Result<DescribeStreamResponse> {
66 let table_name = parse_table_name_from_arn(&request.stream_arn).ok_or_else(|| {
68 DynoxideError::ResourceNotFoundException(format!(
69 "Requested resource not found: Stream: {}",
70 request.stream_arn
71 ))
72 })?;
73
74 let meta = storage.get_table_metadata(&table_name)?.ok_or_else(|| {
75 DynoxideError::ResourceNotFoundException(format!(
76 "Requested resource not found: Stream: {}",
77 request.stream_arn
78 ))
79 })?;
80
81 if !meta.stream_enabled {
82 return Err(DynoxideError::ResourceNotFoundException(format!(
83 "Requested resource not found: Stream: {}",
84 request.stream_arn
85 )));
86 }
87
88 let label = meta.stream_label.clone().unwrap_or_default();
89 let sid = streams::shard_id(&table_name);
90 let (start_seq, end_seq) = storage.get_shard_sequence_range(&table_name, &sid)?;
91
92 Ok(DescribeStreamResponse {
93 stream_description: StreamDescription {
94 stream_arn: request.stream_arn,
95 stream_label: label,
96 stream_status: "ENABLED".to_string(),
97 stream_view_type: meta
98 .stream_view_type
99 .unwrap_or_else(|| "NEW_AND_OLD_IMAGES".to_string()),
100 table_name,
101 shards: vec![Shard {
102 shard_id: sid,
103 sequence_number_range: SequenceNumberRange {
104 starting_sequence_number: start_seq,
105 ending_sequence_number: end_seq,
106 },
107 }],
108 creation_request_date_time: meta.created_at as f64,
109 },
110 })
111}
112
113fn parse_table_name_from_arn(arn: &str) -> Option<String> {
114 let parts: Vec<&str> = arn.split('/').collect();
116 if parts.len() >= 2 {
117 Some(parts[1].to_string())
118 } else {
119 None
120 }
121}