Skip to main content

dynoxide/actions/
describe_stream.rs

1use crate::errors::{DynoxideError, Result};
2use crate::storage_backend::StorageBackend;
3use crate::streams;
4use serde::{Deserialize, Serialize};
5
6#[derive(Debug, Default, Deserialize)]
7pub struct DescribeStreamRequest {
8    #[serde(rename = "StreamArn")]
9    pub stream_arn: String,
10    #[serde(rename = "ExclusiveStartShardId", default)]
11    pub exclusive_start_shard_id: Option<String>,
12    #[serde(rename = "Limit", default)]
13    pub limit: Option<usize>,
14}
15
16#[derive(Debug, Default, Serialize)]
17pub struct DescribeStreamResponse {
18    #[serde(rename = "StreamDescription")]
19    pub stream_description: StreamDescription,
20}
21
22#[derive(Debug, Default, Serialize)]
23pub struct StreamDescription {
24    #[serde(rename = "StreamArn")]
25    pub stream_arn: String,
26    #[serde(rename = "StreamLabel")]
27    pub stream_label: String,
28    #[serde(rename = "StreamStatus")]
29    pub stream_status: String,
30    #[serde(rename = "StreamViewType")]
31    pub stream_view_type: String,
32    #[serde(rename = "TableName")]
33    pub table_name: String,
34    #[serde(rename = "Shards")]
35    pub shards: Vec<Shard>,
36    #[serde(rename = "CreationRequestDateTime")]
37    pub creation_request_date_time: f64,
38}
39
40#[derive(Debug, Default, Serialize)]
41pub struct Shard {
42    #[serde(rename = "ShardId")]
43    pub shard_id: String,
44    #[serde(rename = "SequenceNumberRange")]
45    pub sequence_number_range: SequenceNumberRange,
46}
47
48#[derive(Debug, Default, Serialize)]
49pub struct SequenceNumberRange {
50    #[serde(
51        rename = "StartingSequenceNumber",
52        skip_serializing_if = "Option::is_none"
53    )]
54    pub starting_sequence_number: Option<String>,
55    #[serde(
56        rename = "EndingSequenceNumber",
57        skip_serializing_if = "Option::is_none"
58    )]
59    pub ending_sequence_number: Option<String>,
60}
61
62pub async fn execute<S: StorageBackend>(
63    storage: &S,
64    request: DescribeStreamRequest,
65) -> Result<DescribeStreamResponse> {
66    // Parse table name from ARN
67    let table_name = parse_table_name_from_arn(&request.stream_arn).ok_or_else(|| {
68        DynoxideError::ResourceNotFoundException(format!(
69            "Requested resource not found: Stream: {}",
70            request.stream_arn
71        ))
72    })?;
73
74    let meta = storage
75        .get_table_metadata(&table_name)
76        .await?
77        .ok_or_else(|| {
78            DynoxideError::ResourceNotFoundException(format!(
79                "Requested resource not found: Stream: {}",
80                request.stream_arn
81            ))
82        })?;
83
84    if !meta.stream_enabled {
85        return Err(DynoxideError::ResourceNotFoundException(format!(
86            "Requested resource not found: Stream: {}",
87            request.stream_arn
88        )));
89    }
90
91    let label = meta.stream_label.clone().unwrap_or_default();
92    let sid = streams::shard_id(&table_name);
93    let (start_seq, end_seq) = storage.get_shard_sequence_range(&table_name, &sid).await?;
94
95    Ok(DescribeStreamResponse {
96        stream_description: StreamDescription {
97            stream_arn: request.stream_arn,
98            stream_label: label,
99            stream_status: "ENABLED".to_string(),
100            stream_view_type: meta
101                .stream_view_type
102                .unwrap_or_else(|| "NEW_AND_OLD_IMAGES".to_string()),
103            table_name,
104            shards: vec![Shard {
105                shard_id: sid,
106                sequence_number_range: SequenceNumberRange {
107                    starting_sequence_number: start_seq,
108                    ending_sequence_number: end_seq,
109                },
110            }],
111            creation_request_date_time: meta.created_at as f64,
112        },
113    })
114}
115
116fn parse_table_name_from_arn(arn: &str) -> Option<String> {
117    // Format: arn:aws:dynamodb:dynoxide:000000000000:table/{table_name}/stream/{label}
118    let parts: Vec<&str> = arn.split('/').collect();
119    if parts.len() >= 2 {
120        Some(parts[1].to_string())
121    } else {
122        None
123    }
124}