1use crate::{
2 custom_serde::{deserialize_nullish, float_unix_epoch},
3 streams::DynamoDbBatchItemFailure,
4 time_window::*,
5};
6#[cfg(feature = "builders")]
7use bon::Builder;
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10#[cfg(feature = "catch-all-fields")]
11use serde_json::Value;
12use std::fmt;
13
14#[cfg(test)]
15mod attributes;
16
17#[non_exhaustive]
18#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
19#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
20pub enum StreamViewType {
21 NewImage,
22 OldImage,
23 NewAndOldImages,
24 #[default]
25 KeysOnly,
26}
27
28impl fmt::Display for StreamViewType {
29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30 let val = match self {
31 StreamViewType::NewImage => "NEW_IMAGE",
32 StreamViewType::OldImage => "OLD_IMAGE",
33 StreamViewType::NewAndOldImages => "NEW_AND_OLD_IMAGES",
34 StreamViewType::KeysOnly => "KEYS_ONLY",
35 };
36 write!(f, "{val}")
37 }
38}
39
40#[non_exhaustive]
41#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
42#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
43pub enum StreamStatus {
44 Enabling,
45 Enabled,
46 Disabling,
47 #[default]
48 Disabled,
49}
50
51impl fmt::Display for StreamStatus {
52 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53 let val = match self {
54 StreamStatus::Enabling => "ENABLING",
55 StreamStatus::Enabled => "ENABLED",
56 StreamStatus::Disabling => "DISABLING",
57 StreamStatus::Disabled => "DISABLED",
58 };
59 write!(f, "{val}")
60 }
61}
62
63#[non_exhaustive]
64#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
65#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
66pub enum SharedIteratorType {
67 TrimHorizon,
68 #[default]
69 Latest,
70 AtSequenceNumber,
71 AfterSequenceNumber,
72}
73
74impl fmt::Display for SharedIteratorType {
75 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
76 let val = match self {
77 SharedIteratorType::TrimHorizon => "TRIM_HORIZON",
78 SharedIteratorType::Latest => "LATEST",
79 SharedIteratorType::AtSequenceNumber => "AT_SEQUENCE_NUMBER",
80 SharedIteratorType::AfterSequenceNumber => "AFTER_SEQUENCE_NUMBER",
81 };
82 write!(f, "{val}")
83 }
84}
85
86#[non_exhaustive]
87#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
88#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
89pub enum OperationType {
90 #[default]
91 Insert,
92 Modify,
93 Remove,
94}
95
96impl fmt::Display for OperationType {
97 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98 let val = match self {
99 OperationType::Insert => "INSERT",
100 OperationType::Modify => "MODIFY",
101 OperationType::Remove => "REMOVE",
102 };
103 write!(f, "{val}")
104 }
105}
106
107#[non_exhaustive]
108#[derive(Clone, Default, Debug, Deserialize, Eq, PartialEq, Serialize)]
109#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
110pub enum KeyType {
111 #[default]
112 Hash,
113 Range,
114}
115
116impl fmt::Display for KeyType {
117 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118 let val = match self {
119 KeyType::Hash => "HASH",
120 KeyType::Range => "RANGE",
121 };
122 write!(f, "{val}")
123 }
124}
125
126#[non_exhaustive]
129#[cfg_attr(feature = "builders", derive(Builder))]
130#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
131pub struct Event {
132 #[serde(rename = "Records")]
133 pub records: Vec<EventRecord>,
134 #[cfg(feature = "catch-all-fields")]
138 #[cfg_attr(docsrs, doc(cfg(feature = "catch-all-fields")))]
139 #[serde(flatten)]
140 #[cfg_attr(feature = "builders", builder(default))]
141 pub other: serde_json::Map<String, Value>,
142}
143
144#[non_exhaustive]
147#[cfg_attr(feature = "builders", derive(Builder))]
148#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
149#[serde(rename_all = "camelCase")]
150pub struct TimeWindowEvent {
151 #[serde(rename = "DynamoDBEvent")]
152 #[serde(flatten)]
153 pub dynamo_db_event: Event,
154 #[serde(rename = "TimeWindowProperties")]
155 #[serde(flatten)]
156 pub time_window_properties: TimeWindowProperties,
157 #[cfg(feature = "catch-all-fields")]
161 #[cfg_attr(docsrs, doc(cfg(feature = "catch-all-fields")))]
162 #[serde(flatten)]
163 #[cfg_attr(feature = "builders", builder(default))]
164 pub other: serde_json::Map<String, Value>,
165}
166
167#[non_exhaustive]
169#[cfg_attr(feature = "builders", derive(Builder))]
170#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
171#[serde(rename_all = "camelCase")]
172pub struct TimeWindowEventResponse {
173 #[serde(rename = "TimeWindowEventResponseProperties")]
174 #[serde(flatten)]
175 pub time_window_event_response_properties: TimeWindowEventResponseProperties,
176 pub batch_item_failures: Vec<DynamoDbBatchItemFailure>,
177 #[cfg(feature = "catch-all-fields")]
181 #[cfg_attr(docsrs, doc(cfg(feature = "catch-all-fields")))]
182 #[serde(flatten)]
183 #[cfg_attr(feature = "builders", builder(default))]
184 pub other: serde_json::Map<String, Value>,
185}
186
187#[non_exhaustive]
189#[cfg_attr(feature = "builders", derive(Builder))]
190#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
191#[serde(rename_all = "camelCase")]
192pub struct EventRecord {
193 pub aws_region: String,
195 #[serde(rename = "dynamodb")]
198 pub change: StreamRecord,
199 #[serde(rename = "eventID")]
202 pub event_id: String,
203 pub event_name: String,
211 #[serde(default)]
214 pub event_source: Option<String>,
215 #[serde(default)]
222 pub event_version: Option<String>,
223 #[serde(rename = "eventSourceARN")]
225 #[serde(default)]
226 pub event_source_arn: Option<String>,
227 #[serde(default)]
238 pub user_identity: Option<UserIdentity>,
239 #[serde(default)]
243 pub record_format: Option<String>,
244 #[serde(default)]
246 pub table_name: Option<String>,
247 #[cfg(feature = "catch-all-fields")]
251 #[cfg_attr(docsrs, doc(cfg(feature = "catch-all-fields")))]
252 #[serde(flatten)]
253 #[cfg_attr(feature = "builders", builder(default))]
254 pub other: serde_json::Map<String, Value>,
255}
256
257#[non_exhaustive]
258#[cfg_attr(feature = "builders", derive(Builder))]
259#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
260#[serde(rename_all = "camelCase")]
261pub struct UserIdentity {
262 #[serde(default)]
263 pub type_: String,
264 #[serde(default)]
265 pub principal_id: String,
266 #[cfg(feature = "catch-all-fields")]
270 #[cfg_attr(docsrs, doc(cfg(feature = "catch-all-fields")))]
271 #[serde(flatten)]
272 #[cfg_attr(feature = "builders", builder(default))]
273 pub other: serde_json::Map<String, Value>,
274}
275
276#[non_exhaustive]
279#[cfg_attr(feature = "builders", derive(Builder))]
280#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
281#[serde(rename_all = "camelCase")]
282pub struct StreamRecord {
283 #[serde(rename = "ApproximateCreationDateTime")]
287 #[serde(with = "float_unix_epoch")]
288 #[serde(default)]
289 pub approximate_creation_date_time: DateTime<Utc>,
290 #[serde(deserialize_with = "deserialize_nullish")]
292 #[serde(default)]
293 #[serde(rename = "Keys")]
294 pub keys: serde_dynamo::Item,
295 #[serde(deserialize_with = "deserialize_nullish")]
297 #[serde(default)]
298 #[serde(rename = "NewImage")]
299 pub new_image: serde_dynamo::Item,
300 #[serde(deserialize_with = "deserialize_nullish")]
302 #[serde(default)]
303 #[serde(rename = "OldImage")]
304 pub old_image: serde_dynamo::Item,
305 #[serde(default)]
307 #[serde(rename = "SequenceNumber")]
308 pub sequence_number: Option<String>,
309 #[serde(rename = "SizeBytes")]
311 pub size_bytes: i64,
312 #[serde(default)]
315 #[serde(rename = "StreamViewType")]
316 pub stream_view_type: Option<StreamViewType>,
317 #[cfg(feature = "catch-all-fields")]
321 #[cfg_attr(docsrs, doc(cfg(feature = "catch-all-fields")))]
322 #[serde(flatten)]
323 #[cfg_attr(feature = "builders", builder(default))]
324 pub other: serde_json::Map<String, Value>,
325}
326
327#[cfg(test)]
328#[allow(deprecated)]
329mod test {
330 use super::*;
331 use crate::fixtures::verify_serde_roundtrip;
332 use chrono::TimeZone;
333
334 #[test]
335 #[cfg(feature = "dynamodb")]
336 fn example_dynamodb_event() {
337 let data = include_bytes!("../../fixtures/example-dynamodb-event.json");
338 let mut parsed: Event = serde_json::from_slice(data).unwrap();
339 let output: String = serde_json::to_string(&parsed).unwrap();
340 let reparsed: Event = serde_json::from_slice(output.as_bytes()).unwrap();
341 assert_eq!(parsed, reparsed);
342
343 let event = parsed.records.pop().unwrap();
344 let date = Utc.ymd(2016, 12, 2).and_hms(1, 27, 0);
345 assert_eq!(date, event.change.approximate_creation_date_time);
346 }
347
348 #[test]
349 #[cfg(feature = "dynamodb")]
350 fn example_dynamodb_event_null_items() {
351 let data = include_bytes!("../../fixtures/example-dynamodb-event-null-items.json");
352 let event = verify_serde_roundtrip::<Event>(data);
353 assert_eq!(0, event.records[0].change.keys.len());
354 assert_eq!(0, event.records[0].change.new_image.len());
355 assert_eq!(0, event.records[0].change.old_image.len());
356 }
357
358 #[test]
359 #[cfg(feature = "dynamodb")]
360 fn example_dynamodb_event_with_optional_fields() {
361 let data = include_bytes!("../../fixtures/example-dynamodb-event-record-with-optional-fields.json");
362 let parsed: EventRecord = serde_json::from_slice(data).unwrap();
363 let output: String = serde_json::to_string(&parsed).unwrap();
364 let reparsed: EventRecord = serde_json::from_slice(output.as_bytes()).unwrap();
365 assert_eq!(parsed, reparsed);
366 let date = Utc.timestamp_micros(0).unwrap(); assert_eq!(date, reparsed.change.approximate_creation_date_time);
368 }
369}