Skip to main content

aws_lambda_events/event/dynamodb/
mod.rs

1use crate::{
2    custom_serde::{deserialize_nullish, float_unix_epoch},
3    streams::DynamoDbBatchItemFailure,
4    time_window::*,
5};
6#[cfg(feature = "builders")]
7use bon::Builder;
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10#[cfg(feature = "catch-all-fields")]
11use serde_json::Value;
12use std::fmt;
13
14#[cfg(test)]
15mod attributes;
16
17#[non_exhaustive]
18#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
19#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
20pub enum StreamViewType {
21    NewImage,
22    OldImage,
23    NewAndOldImages,
24    #[default]
25    KeysOnly,
26}
27
28impl fmt::Display for StreamViewType {
29    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30        let val = match self {
31            StreamViewType::NewImage => "NEW_IMAGE",
32            StreamViewType::OldImage => "OLD_IMAGE",
33            StreamViewType::NewAndOldImages => "NEW_AND_OLD_IMAGES",
34            StreamViewType::KeysOnly => "KEYS_ONLY",
35        };
36        write!(f, "{val}")
37    }
38}
39
40#[non_exhaustive]
41#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
42#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
43pub enum StreamStatus {
44    Enabling,
45    Enabled,
46    Disabling,
47    #[default]
48    Disabled,
49}
50
51impl fmt::Display for StreamStatus {
52    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53        let val = match self {
54            StreamStatus::Enabling => "ENABLING",
55            StreamStatus::Enabled => "ENABLED",
56            StreamStatus::Disabling => "DISABLING",
57            StreamStatus::Disabled => "DISABLED",
58        };
59        write!(f, "{val}")
60    }
61}
62
63#[non_exhaustive]
64#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
65#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
66pub enum SharedIteratorType {
67    TrimHorizon,
68    #[default]
69    Latest,
70    AtSequenceNumber,
71    AfterSequenceNumber,
72}
73
74impl fmt::Display for SharedIteratorType {
75    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
76        let val = match self {
77            SharedIteratorType::TrimHorizon => "TRIM_HORIZON",
78            SharedIteratorType::Latest => "LATEST",
79            SharedIteratorType::AtSequenceNumber => "AT_SEQUENCE_NUMBER",
80            SharedIteratorType::AfterSequenceNumber => "AFTER_SEQUENCE_NUMBER",
81        };
82        write!(f, "{val}")
83    }
84}
85
86#[non_exhaustive]
87#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
88#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
89pub enum OperationType {
90    #[default]
91    Insert,
92    Modify,
93    Remove,
94}
95
96impl fmt::Display for OperationType {
97    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98        let val = match self {
99            OperationType::Insert => "INSERT",
100            OperationType::Modify => "MODIFY",
101            OperationType::Remove => "REMOVE",
102        };
103        write!(f, "{val}")
104    }
105}
106
107#[non_exhaustive]
108#[derive(Clone, Default, Debug, Deserialize, Eq, PartialEq, Serialize)]
109#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
110pub enum KeyType {
111    #[default]
112    Hash,
113    Range,
114}
115
116impl fmt::Display for KeyType {
117    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118        let val = match self {
119            KeyType::Hash => "HASH",
120            KeyType::Range => "RANGE",
121        };
122        write!(f, "{val}")
123    }
124}
125
126/// The `Event` stream event handled to Lambda
127/// <http://docs.aws.amazon.com/lambda/latest/dg/eventsources.html#eventsources-ddb-update>
128#[non_exhaustive]
129#[cfg_attr(feature = "builders", derive(Builder))]
130#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
131pub struct Event {
132    #[serde(rename = "Records")]
133    pub records: Vec<EventRecord>,
134    /// Catchall to catch any additional fields that were present but not explicitly defined by this struct.
135    /// Enabled with Cargo feature `catch-all-fields`.
136    /// If `catch-all-fields` is disabled, any additional fields that are present will be ignored.
137    #[cfg(feature = "catch-all-fields")]
138    #[cfg_attr(docsrs, doc(cfg(feature = "catch-all-fields")))]
139    #[serde(flatten)]
140    #[cfg_attr(feature = "builders", builder(default))]
141    pub other: serde_json::Map<String, Value>,
142}
143
144/// `TimeWindowEvent` represents an Amazon Dynamodb event when using time windows
145/// ref. <https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-windows>
146#[non_exhaustive]
147#[cfg_attr(feature = "builders", derive(Builder))]
148#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
149#[serde(rename_all = "camelCase")]
150pub struct TimeWindowEvent {
151    #[serde(rename = "DynamoDBEvent")]
152    #[serde(flatten)]
153    pub dynamo_db_event: Event,
154    #[serde(rename = "TimeWindowProperties")]
155    #[serde(flatten)]
156    pub time_window_properties: TimeWindowProperties,
157    /// Catchall to catch any additional fields that were present but not explicitly defined by this struct.
158    /// Enabled with Cargo feature `catch-all-fields`.
159    /// If `catch-all-fields` is disabled, any additional fields that are present will be ignored.
160    #[cfg(feature = "catch-all-fields")]
161    #[cfg_attr(docsrs, doc(cfg(feature = "catch-all-fields")))]
162    #[serde(flatten)]
163    #[cfg_attr(feature = "builders", builder(default))]
164    pub other: serde_json::Map<String, Value>,
165}
166
167/// `TimeWindowEventResponse` is the outer structure to report batch item failures for DynamoDBTimeWindowEvent.
168#[non_exhaustive]
169#[cfg_attr(feature = "builders", derive(Builder))]
170#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
171#[serde(rename_all = "camelCase")]
172pub struct TimeWindowEventResponse {
173    #[serde(rename = "TimeWindowEventResponseProperties")]
174    #[serde(flatten)]
175    pub time_window_event_response_properties: TimeWindowEventResponseProperties,
176    pub batch_item_failures: Vec<DynamoDbBatchItemFailure>,
177    /// Catchall to catch any additional fields that were present but not explicitly defined by this struct.
178    /// Enabled with Cargo feature `catch-all-fields`.
179    /// If `catch-all-fields` is disabled, any additional fields that are present will be ignored.
180    #[cfg(feature = "catch-all-fields")]
181    #[cfg_attr(docsrs, doc(cfg(feature = "catch-all-fields")))]
182    #[serde(flatten)]
183    #[cfg_attr(feature = "builders", builder(default))]
184    pub other: serde_json::Map<String, Value>,
185}
186
187/// EventRecord stores information about each record of a DynamoDb stream event
188#[non_exhaustive]
189#[cfg_attr(feature = "builders", derive(Builder))]
190#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
191#[serde(rename_all = "camelCase")]
192pub struct EventRecord {
193    /// The region in which the GetRecords request was received.
194    pub aws_region: String,
195    /// The main body of the stream record, containing all of the DynamoDB-specific
196    /// fields.
197    #[serde(rename = "dynamodb")]
198    pub change: StreamRecord,
199    /// A globally unique identifier for the event that was recorded in this stream
200    /// record.
201    #[serde(rename = "eventID")]
202    pub event_id: String,
203    /// The type of data modification that was performed on the DynamoDB table:
204    ///
205    /// * INSERT - a new item was added to the table.
206    ///
207    /// * MODIFY - one or more of an existing item's attributes were modified.
208    ///
209    /// * REMOVE - the item was deleted from the table
210    pub event_name: String,
211    /// The AWS service from which the stream record originated. For DynamoDB Streams,
212    /// this is aws:dynamodb.
213    #[serde(default)]
214    pub event_source: Option<String>,
215    /// The version number of the stream record format. This number is updated whenever
216    /// the structure of Record is modified.
217    ///
218    /// Client applications must not assume that eventVersion will remain at a particular
219    /// value, as this number is subject to change at any time. In general, eventVersion
220    /// will only increase as the low-level DynamoDB Streams API evolves.
221    #[serde(default)]
222    pub event_version: Option<String>,
223    /// The event source ARN of DynamoDB
224    #[serde(rename = "eventSourceARN")]
225    #[serde(default)]
226    pub event_source_arn: Option<String>,
227    /// Items that are deleted by the Time to Live process after expiration have
228    /// the following fields:
229    ///
230    /// * Records[].userIdentity.type
231    ///
232    /// "Service"
233    ///
234    /// * Records[].userIdentity.principalId
235    ///
236    /// "dynamodb.amazonaws.com"
237    #[serde(default)]
238    pub user_identity: Option<UserIdentity>,
239    /// Describes the record format and relevant mapping information that
240    /// should be applied to schematize the records on the stream. For
241    /// DynamoDB Streams, this is application/json.
242    #[serde(default)]
243    pub record_format: Option<String>,
244    /// The DynamoDB table that this event was recorded for.
245    #[serde(default)]
246    pub table_name: Option<String>,
247    /// Catchall to catch any additional fields that were present but not explicitly defined by this struct.
248    /// Enabled with Cargo feature `catch-all-fields`.
249    /// If `catch-all-fields` is disabled, any additional fields that are present will be ignored.
250    #[cfg(feature = "catch-all-fields")]
251    #[cfg_attr(docsrs, doc(cfg(feature = "catch-all-fields")))]
252    #[serde(flatten)]
253    #[cfg_attr(feature = "builders", builder(default))]
254    pub other: serde_json::Map<String, Value>,
255}
256
257#[non_exhaustive]
258#[cfg_attr(feature = "builders", derive(Builder))]
259#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
260#[serde(rename_all = "camelCase")]
261pub struct UserIdentity {
262    #[serde(default)]
263    pub type_: String,
264    #[serde(default)]
265    pub principal_id: String,
266    /// Catchall to catch any additional fields that were present but not explicitly defined by this struct.
267    /// Enabled with Cargo feature `catch-all-fields`.
268    /// If `catch-all-fields` is disabled, any additional fields that are present will be ignored.
269    #[cfg(feature = "catch-all-fields")]
270    #[cfg_attr(docsrs, doc(cfg(feature = "catch-all-fields")))]
271    #[serde(flatten)]
272    #[cfg_attr(feature = "builders", builder(default))]
273    pub other: serde_json::Map<String, Value>,
274}
275
276/// `DynamoDbStreamRecord` represents a description of a single data modification that was performed on an item
277/// in a DynamoDB table.
278#[non_exhaustive]
279#[cfg_attr(feature = "builders", derive(Builder))]
280#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
281#[serde(rename_all = "camelCase")]
282pub struct StreamRecord {
283    /// The approximate date and time when the stream record was created, in UNIX
284    /// epoch time (<http://www.epochconverter.com/>) format. Might not be present in
285    /// the record: <https://github.com/aws/aws-lambda-rust-runtime/issues/889>
286    #[serde(rename = "ApproximateCreationDateTime")]
287    #[serde(with = "float_unix_epoch")]
288    #[serde(default)]
289    pub approximate_creation_date_time: DateTime<Utc>,
290    /// The primary key attribute(s) for the DynamoDB item that was modified.
291    #[serde(deserialize_with = "deserialize_nullish")]
292    #[serde(default)]
293    #[serde(rename = "Keys")]
294    pub keys: serde_dynamo::Item,
295    /// The item in the DynamoDB table as it appeared after it was modified.
296    #[serde(deserialize_with = "deserialize_nullish")]
297    #[serde(default)]
298    #[serde(rename = "NewImage")]
299    pub new_image: serde_dynamo::Item,
300    /// The item in the DynamoDB table as it appeared before it was modified.
301    #[serde(deserialize_with = "deserialize_nullish")]
302    #[serde(default)]
303    #[serde(rename = "OldImage")]
304    pub old_image: serde_dynamo::Item,
305    /// The sequence number of the stream record.
306    #[serde(default)]
307    #[serde(rename = "SequenceNumber")]
308    pub sequence_number: Option<String>,
309    /// The size of the stream record, in bytes.
310    #[serde(rename = "SizeBytes")]
311    pub size_bytes: i64,
312    /// The type of data from the modified DynamoDB item that was captured in this
313    /// stream record.
314    #[serde(default)]
315    #[serde(rename = "StreamViewType")]
316    pub stream_view_type: Option<StreamViewType>,
317    /// Catchall to catch any additional fields that were present but not explicitly defined by this struct.
318    /// Enabled with Cargo feature `catch-all-fields`.
319    /// If `catch-all-fields` is disabled, any additional fields that are present will be ignored.
320    #[cfg(feature = "catch-all-fields")]
321    #[cfg_attr(docsrs, doc(cfg(feature = "catch-all-fields")))]
322    #[serde(flatten)]
323    #[cfg_attr(feature = "builders", builder(default))]
324    pub other: serde_json::Map<String, Value>,
325}
326
327#[cfg(test)]
328#[allow(deprecated)]
329mod test {
330    use super::*;
331    use crate::fixtures::verify_serde_roundtrip;
332    use chrono::TimeZone;
333
334    #[test]
335    #[cfg(feature = "dynamodb")]
336    fn example_dynamodb_event() {
337        let data = include_bytes!("../../fixtures/example-dynamodb-event.json");
338        let mut parsed: Event = serde_json::from_slice(data).unwrap();
339        let output: String = serde_json::to_string(&parsed).unwrap();
340        let reparsed: Event = serde_json::from_slice(output.as_bytes()).unwrap();
341        assert_eq!(parsed, reparsed);
342
343        let event = parsed.records.pop().unwrap();
344        let date = Utc.ymd(2016, 12, 2).and_hms(1, 27, 0);
345        assert_eq!(date, event.change.approximate_creation_date_time);
346    }
347
348    #[test]
349    #[cfg(feature = "dynamodb")]
350    fn example_dynamodb_event_null_items() {
351        let data = include_bytes!("../../fixtures/example-dynamodb-event-null-items.json");
352        let event = verify_serde_roundtrip::<Event>(data);
353        assert_eq!(0, event.records[0].change.keys.len());
354        assert_eq!(0, event.records[0].change.new_image.len());
355        assert_eq!(0, event.records[0].change.old_image.len());
356    }
357
358    #[test]
359    #[cfg(feature = "dynamodb")]
360    fn example_dynamodb_event_with_optional_fields() {
361        let data = include_bytes!("../../fixtures/example-dynamodb-event-record-with-optional-fields.json");
362        let parsed: EventRecord = serde_json::from_slice(data).unwrap();
363        let output: String = serde_json::to_string(&parsed).unwrap();
364        let reparsed: EventRecord = serde_json::from_slice(output.as_bytes()).unwrap();
365        assert_eq!(parsed, reparsed);
366        let date = Utc.timestamp_micros(0).unwrap(); // 1970-01-01T00:00:00Z
367        assert_eq!(date, reparsed.change.approximate_creation_date_time);
368    }
369}