dynoxide/actions/
describe_stream.rs1use crate::errors::{DynoxideError, Result};
2use crate::storage_backend::StorageBackend;
3use crate::streams;
4use serde::{Deserialize, Serialize};
5
6#[derive(Debug, Default, Deserialize)]
7pub struct DescribeStreamRequest {
8 #[serde(rename = "StreamArn")]
9 pub stream_arn: String,
10 #[serde(rename = "ExclusiveStartShardId", default)]
11 pub exclusive_start_shard_id: Option<String>,
12 #[serde(rename = "Limit", default)]
13 pub limit: Option<usize>,
14}
15
16#[derive(Debug, Default, Serialize)]
17pub struct DescribeStreamResponse {
18 #[serde(rename = "StreamDescription")]
19 pub stream_description: StreamDescription,
20}
21
22#[derive(Debug, Default, Serialize)]
23pub struct StreamDescription {
24 #[serde(rename = "StreamArn")]
25 pub stream_arn: String,
26 #[serde(rename = "StreamLabel")]
27 pub stream_label: String,
28 #[serde(rename = "StreamStatus")]
29 pub stream_status: String,
30 #[serde(rename = "StreamViewType")]
31 pub stream_view_type: String,
32 #[serde(rename = "TableName")]
33 pub table_name: String,
34 #[serde(rename = "Shards")]
35 pub shards: Vec<Shard>,
36 #[serde(rename = "CreationRequestDateTime")]
37 pub creation_request_date_time: f64,
38}
39
40#[derive(Debug, Default, Serialize)]
41pub struct Shard {
42 #[serde(rename = "ShardId")]
43 pub shard_id: String,
44 #[serde(rename = "SequenceNumberRange")]
45 pub sequence_number_range: SequenceNumberRange,
46}
47
48#[derive(Debug, Default, Serialize)]
49pub struct SequenceNumberRange {
50 #[serde(
51 rename = "StartingSequenceNumber",
52 skip_serializing_if = "Option::is_none"
53 )]
54 pub starting_sequence_number: Option<String>,
55 #[serde(
56 rename = "EndingSequenceNumber",
57 skip_serializing_if = "Option::is_none"
58 )]
59 pub ending_sequence_number: Option<String>,
60}
61
62pub async fn execute<S: StorageBackend>(
63 storage: &S,
64 request: DescribeStreamRequest,
65) -> Result<DescribeStreamResponse> {
66 let table_name = parse_table_name_from_arn(&request.stream_arn).ok_or_else(|| {
68 DynoxideError::ResourceNotFoundException(format!(
69 "Requested resource not found: Stream: {}",
70 request.stream_arn
71 ))
72 })?;
73
74 let meta = storage
75 .get_table_metadata(&table_name)
76 .await?
77 .ok_or_else(|| {
78 DynoxideError::ResourceNotFoundException(format!(
79 "Requested resource not found: Stream: {}",
80 request.stream_arn
81 ))
82 })?;
83
84 if !meta.stream_enabled {
85 return Err(DynoxideError::ResourceNotFoundException(format!(
86 "Requested resource not found: Stream: {}",
87 request.stream_arn
88 )));
89 }
90
91 let label = meta.stream_label.clone().unwrap_or_default();
92 let sid = streams::shard_id(&table_name);
93 let (start_seq, end_seq) = storage.get_shard_sequence_range(&table_name, &sid).await?;
94
95 Ok(DescribeStreamResponse {
96 stream_description: StreamDescription {
97 stream_arn: request.stream_arn,
98 stream_label: label,
99 stream_status: "ENABLED".to_string(),
100 stream_view_type: meta
101 .stream_view_type
102 .unwrap_or_else(|| "NEW_AND_OLD_IMAGES".to_string()),
103 table_name,
104 shards: vec![Shard {
105 shard_id: sid,
106 sequence_number_range: SequenceNumberRange {
107 starting_sequence_number: start_seq,
108 ending_sequence_number: end_seq,
109 },
110 }],
111 creation_request_date_time: meta.created_at as f64,
112 },
113 })
114}
115
116fn parse_table_name_from_arn(arn: &str) -> Option<String> {
117 let parts: Vec<&str> = arn.split('/').collect();
119 if parts.len() >= 2 {
120 Some(parts[1].to_string())
121 } else {
122 None
123 }
124}